2
0

allocation.go 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. )
  50. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  51. // for the window defined by the given start and end times. The Allocations
  52. // returned are unaggregated (i.e. down to the container level).
  53. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  54. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  55. // 2. Run and apply the results of the remaining queries to
  56. // 3. Build out AllocationSet from completed Pod map
  57. // Create a window spanning the requested query
  58. window := kubecost.NewWindow(&start, &end)
  59. // Create an empty AllocationSet. For safety, in the case of an error, we
  60. // should prefer to return this empty set with the error. (In the case of
  61. // no error, of course we populate the set and return it.)
  62. allocSet := kubecost.NewAllocationSet(start, end)
  63. // (1) Build out Pod map
  64. // Build out a map of Allocations as a mapping from pod-to-container-to-
  65. // underlying-Allocation instance, starting with (start, end) so that we
  66. // begin with minutes, from which we compute resource allocation and cost
  67. // totals from measured rate data.
  68. podMap := map[podKey]*Pod{}
  69. // clusterStarts and clusterEnds record the earliest start and latest end
  70. // times, respectively, on a cluster-basis. These are used for unmounted
  71. // PVs and other "virtual" Allocations so that minutes are maximally
  72. // accurate during start-up or spin-down of a cluster
  73. clusterStart := map[string]time.Time{}
  74. clusterEnd := map[string]time.Time{}
  75. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  76. // (2) Run and apply remaining queries
  77. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  78. // including handling Thanos offset
  79. durStr, offStr, err := window.DurationOffsetForPrometheus()
  80. if err != nil {
  81. // Negative duration, so return empty set
  82. return allocSet, nil
  83. }
  84. // Convert resolution duration to a query-ready string
  85. resStr := util.DurationString(resolution)
  86. ctx := prom.NewContext(cm.PrometheusClient)
  87. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  88. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  89. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  90. resChRAMRequests := ctx.Query(queryRAMRequests)
  91. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  92. resChRAMUsage := ctx.Query(queryRAMUsage)
  93. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  94. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  95. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  96. resChCPURequests := ctx.Query(queryCPURequests)
  97. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  98. resChCPUUsage := ctx.Query(queryCPUUsage)
  99. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  100. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  101. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  102. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  103. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  104. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  105. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  106. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  107. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  108. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  109. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  110. resChPVCInfo := ctx.Query(queryPVCInfo)
  111. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  112. resChPVBytes := ctx.Query(queryPVBytes)
  113. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  114. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  115. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  116. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  117. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  118. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  119. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  120. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  121. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  122. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  123. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  124. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  125. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  126. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  127. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  128. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  129. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  130. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  131. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  132. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  133. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  134. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  135. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  136. resChPodLabels := ctx.Query(queryPodLabels)
  137. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  138. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  139. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  140. resChServiceLabels := ctx.Query(queryServiceLabels)
  141. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  142. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  143. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  144. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  145. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  146. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  147. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  148. resChJobLabels := ctx.Query(queryJobLabels)
  149. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  150. resCPURequests, _ := resChCPURequests.Await()
  151. resCPUUsage, _ := resChCPUUsage.Await()
  152. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  153. resRAMRequests, _ := resChRAMRequests.Await()
  154. resRAMUsage, _ := resChRAMUsage.Await()
  155. resGPUsRequested, _ := resChGPUsRequested.Await()
  156. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  157. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  158. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  159. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  160. resPVBytes, _ := resChPVBytes.Await()
  161. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  164. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  165. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  166. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  167. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  168. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  169. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  170. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  171. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  172. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  173. resPodLabels, _ := resChPodLabels.Await()
  174. resPodAnnotations, _ := resChPodAnnotations.Await()
  175. resServiceLabels, _ := resChServiceLabels.Await()
  176. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  177. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  178. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  179. resJobLabels, _ := resChJobLabels.Await()
  180. if ctx.HasErrors() {
  181. for _, err := range ctx.Errors() {
  182. log.Errorf("CostModel.ComputeAllocation: %s", err)
  183. }
  184. return allocSet, ctx.ErrorCollection()
  185. }
  186. // We choose to apply allocation before requests in the cases of RAM and
  187. // CPU so that we can assert that allocation should always be greater than
  188. // or equal to request.
  189. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  190. applyCPUCoresRequested(podMap, resCPURequests)
  191. applyCPUCoresUsed(podMap, resCPUUsage)
  192. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  193. applyRAMBytesRequested(podMap, resRAMRequests)
  194. applyRAMBytesUsed(podMap, resRAMUsage)
  195. applyGPUsRequested(podMap, resGPUsRequested)
  196. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  197. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  198. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  199. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  200. podLabels := resToPodLabels(resPodLabels)
  201. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  202. podAnnotations := resToPodAnnotations(resPodAnnotations)
  203. applyLabels(podMap, namespaceLabels, podLabels)
  204. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  205. serviceLabels := getServiceLabels(resServiceLabels)
  206. applyServicesToPods(podMap, podLabels, serviceLabels)
  207. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  208. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  209. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  210. podJobMap := resToPodJobMap(resJobLabels)
  211. applyControllersToPods(podMap, podDeploymentMap)
  212. applyControllersToPods(podMap, podStatefulSetMap)
  213. applyControllersToPods(podMap, podDaemonSetMap)
  214. applyControllersToPods(podMap, podJobMap)
  215. // TODO breakdown network costs?
  216. // Build out a map of Nodes with resource costs, discounts, and node types
  217. // for converting resource allocation data to cumulative costs.
  218. nodeMap := map[nodeKey]*NodePricing{}
  219. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  220. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  221. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  222. applyNodeSpot(nodeMap, resNodeIsSpot)
  223. applyNodeDiscount(nodeMap, cm)
  224. // Build out the map of all PVs with class, size and cost-per-hour.
  225. // Note: this does not record time running, which we may want to
  226. // include later for increased PV precision. (As long as the PV has
  227. // a PVC, we get time running there, so this is only inaccurate
  228. // for short-lived, unmounted PVs.)
  229. pvMap := map[pvKey]*PV{}
  230. buildPVMap(pvMap, resPVCostPerGiBHour)
  231. applyPVBytes(pvMap, resPVBytes)
  232. // Build out the map of all PVCs with time running, bytes requested,
  233. // and connect to the correct PV from pvMap. (If no PV exists, that
  234. // is noted, but does not result in any allocation/cost.)
  235. pvcMap := map[pvcKey]*PVC{}
  236. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  237. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  238. // Build out the relationships of pods to their PVCs. This step
  239. // populates the PVC.Count field so that PVC allocation can be
  240. // split appropriately among each pod's container allocation.
  241. podPVCMap := map[podKey][]*PVC{}
  242. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  243. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  244. // cluster representing each cluster's unmounted PVs (if necessary).
  245. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  246. // (3) Build out AllocationSet from Pod map
  247. for _, pod := range podMap {
  248. for _, alloc := range pod.Allocations {
  249. cluster, _ := alloc.Properties.GetCluster()
  250. nodeName, _ := alloc.Properties.GetNode()
  251. namespace, _ := alloc.Properties.GetNamespace()
  252. pod, _ := alloc.Properties.GetPod()
  253. container, _ := alloc.Properties.GetContainer()
  254. podKey := newPodKey(cluster, namespace, pod)
  255. nodeKey := newNodeKey(cluster, nodeName)
  256. node := cm.getNodePricing(nodeMap, nodeKey)
  257. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  258. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  259. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  260. if pvcs, ok := podPVCMap[podKey]; ok {
  261. for _, pvc := range pvcs {
  262. // Determine the (start, end) of the relationship between the
  263. // given PVC and the associated Allocation so that a precise
  264. // number of hours can be used to compute cumulative cost.
  265. s, e := alloc.Start, alloc.End
  266. if pvc.Start.After(alloc.Start) {
  267. s = pvc.Start
  268. }
  269. if pvc.End.Before(alloc.End) {
  270. e = pvc.End
  271. }
  272. minutes := e.Sub(s).Minutes()
  273. hrs := minutes / 60.0
  274. count := float64(pvc.Count)
  275. if pvc.Count < 1 {
  276. count = 1
  277. }
  278. gib := pvc.Bytes / 1024 / 1024 / 1024
  279. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  280. // Apply the size and cost of the PV to the allocation, each
  281. // weighted by count (i.e. the number of containers in the pod)
  282. alloc.PVByteHours += pvc.Bytes * hrs / count
  283. alloc.PVCost += cost / count
  284. }
  285. }
  286. // Make sure that the name is correct (node may not be present at this
  287. // point due to it missing from queryMinutes) then insert.
  288. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  289. allocSet.Set(alloc)
  290. }
  291. }
  292. return allocSet, nil
  293. }
  294. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  295. // Assumes that window is positive and closed
  296. start, end := *window.Start(), *window.End()
  297. // Convert resolution duration to a query-ready string
  298. resStr := util.DurationString(resolution)
  299. ctx := prom.NewContext(cm.PrometheusClient)
  300. // Query for (start, end) by (pod, namespace, cluster) over the given
  301. // window, using the given resolution, and if necessary in batches no
  302. // larger than the given maximum batch size. If working in batches, track
  303. // overall progress by starting with (window.start, window.start) and
  304. // querying in batches no larger than maxBatchSize from start-to-end,
  305. // folding each result set into podMap as the results come back.
  306. coverage := kubecost.NewWindow(&start, &start)
  307. numQuery := 1
  308. for coverage.End().Before(end) {
  309. // Determine the (start, end) of the current batch
  310. batchStart := *coverage.End()
  311. batchEnd := coverage.End().Add(maxBatchSize)
  312. if batchEnd.After(end) {
  313. batchEnd = end
  314. }
  315. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  316. var resPods []*prom.QueryResult
  317. var err error
  318. maxTries := 3
  319. numTries := 0
  320. for resPods == nil && numTries < maxTries {
  321. numTries++
  322. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  323. // including handling Thanos offset
  324. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  325. if err != nil || durStr == "" {
  326. // Negative duration, so set empty results and don't query
  327. resPods = []*prom.QueryResult{}
  328. err = nil
  329. break
  330. }
  331. // Submit and profile query
  332. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  333. queryProfile := time.Now()
  334. resPods, err = ctx.Query(queryPods).Await()
  335. if err != nil {
  336. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  337. resPods = nil
  338. }
  339. }
  340. if err != nil {
  341. return err
  342. }
  343. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  344. coverage = coverage.ExpandEnd(batchEnd)
  345. numQuery++
  346. }
  347. return nil
  348. }
  349. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  350. for _, res := range resPods {
  351. if len(res.Values) == 0 {
  352. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  353. continue
  354. }
  355. cluster, err := res.GetString("cluster_id")
  356. if err != nil {
  357. cluster = env.GetClusterID()
  358. }
  359. labels, err := res.GetStrings("namespace", "pod")
  360. if err != nil {
  361. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  362. continue
  363. }
  364. namespace := labels["namespace"]
  365. pod := labels["pod"]
  366. key := newPodKey(cluster, namespace, pod)
  367. // allocStart and allocEnd are the timestamps of the first and last
  368. // minutes the pod was running, respectively. We subtract one resolution
  369. // from allocStart because this point will actually represent the end
  370. // of the first minute. We don't subtract from allocEnd because it
  371. // already represents the end of the last minute.
  372. var allocStart, allocEnd time.Time
  373. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  374. for _, datum := range res.Values {
  375. t := time.Unix(int64(datum.Timestamp), 0)
  376. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  377. // Set the start timestamp to the earliest non-zero timestamp
  378. allocStart = t
  379. // Record adjustment coefficient, i.e. the portion of the start
  380. // timestamp to "ignore". That is, sometimes the value will be
  381. // 0.5, meaning that we should discount the time running by
  382. // half of the resolution the timestamp stands for.
  383. startAdjustmentCoeff = (1.0 - datum.Value)
  384. }
  385. if datum.Value > 0 && window.Contains(t) {
  386. // Set the end timestamp to the latest non-zero timestamp
  387. allocEnd = t
  388. // Record adjustment coefficient, i.e. the portion of the end
  389. // timestamp to "ignore". (See explanation above for start.)
  390. endAdjustmentCoeff = (1.0 - datum.Value)
  391. }
  392. }
  393. if allocStart.IsZero() || allocEnd.IsZero() {
  394. continue
  395. }
  396. // Adjust timestamps according to the resolution and the adjustment
  397. // coefficients, as described above. That is, count the start timestamp
  398. // from the beginning of the resolution, not the end. Then "reduce" the
  399. // start and end by the correct amount, in the case that the "running"
  400. // value of the first or last timestamp was not a full 1.0.
  401. allocStart = allocStart.Add(-resolution)
  402. // Note: the *100 and /100 are necessary because Duration is an int, so
  403. // 0.5, for instance, will be truncated, resulting in no adjustment.
  404. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  405. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  406. // If there is only one point with a value <= 0.5 that the start and
  407. // end timestamps both share, then we will enter this case because at
  408. // least half of a resolution will be subtracted from both the start
  409. // and the end. If that is the case, then add back half of each side
  410. // so that the pod is said to run for half a resolution total.
  411. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  412. // end up with allocEnd == allocStart and each coeff == 0.5. In
  413. // that case, add 0.25m to each side, resulting in 0.5m duration.
  414. if !allocEnd.After(allocStart) {
  415. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  416. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  417. }
  418. // Set start if unset or this datum's start time is earlier than the
  419. // current earliest time.
  420. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  421. clusterStart[cluster] = allocStart
  422. }
  423. // Set end if unset or this datum's end time is later than the
  424. // current latest time.
  425. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  426. clusterEnd[cluster] = allocEnd
  427. }
  428. if pod, ok := podMap[key]; ok {
  429. // Pod has already been recorded, so update it accordingly
  430. if allocStart.Before(pod.Start) {
  431. pod.Start = allocStart
  432. }
  433. if allocEnd.After(pod.End) {
  434. pod.End = allocEnd
  435. }
  436. } else {
  437. // Pod has not been recorded yet, so insert it
  438. podMap[key] = &Pod{
  439. Window: window.Clone(),
  440. Start: allocStart,
  441. End: allocEnd,
  442. Key: key,
  443. Allocations: map[string]*kubecost.Allocation{},
  444. }
  445. }
  446. }
  447. }
  448. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  449. for _, res := range resCPUCoresAllocated {
  450. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  451. if err != nil {
  452. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  453. continue
  454. }
  455. pod, ok := podMap[key]
  456. if !ok {
  457. continue
  458. }
  459. container, err := res.GetString("container")
  460. if err != nil {
  461. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  462. continue
  463. }
  464. if _, ok := pod.Allocations[container]; !ok {
  465. pod.AppendContainer(container)
  466. }
  467. cpuCores := res.Values[0].Value
  468. hours := pod.Allocations[container].Minutes() / 60.0
  469. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  470. node, err := res.GetString("node")
  471. if err != nil {
  472. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  473. continue
  474. }
  475. pod.Allocations[container].Properties.SetNode(node)
  476. }
  477. }
  478. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  479. for _, res := range resCPUCoresRequested {
  480. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  481. if err != nil {
  482. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  483. continue
  484. }
  485. pod, ok := podMap[key]
  486. if !ok {
  487. continue
  488. }
  489. container, err := res.GetString("container")
  490. if err != nil {
  491. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  492. continue
  493. }
  494. if _, ok := pod.Allocations[container]; !ok {
  495. pod.AppendContainer(container)
  496. }
  497. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  498. // If CPU allocation is less than requests, set CPUCoreHours to
  499. // request level.
  500. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  501. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  502. }
  503. node, err := res.GetString("node")
  504. if err != nil {
  505. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  506. continue
  507. }
  508. pod.Allocations[container].Properties.SetNode(node)
  509. }
  510. }
  511. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  512. for _, res := range resCPUCoresUsed {
  513. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  514. if err != nil {
  515. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  516. continue
  517. }
  518. pod, ok := podMap[key]
  519. if !ok {
  520. continue
  521. }
  522. container, err := res.GetString("container_name")
  523. if err != nil {
  524. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  525. continue
  526. }
  527. if _, ok := pod.Allocations[container]; !ok {
  528. pod.AppendContainer(container)
  529. }
  530. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  531. }
  532. }
  533. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  534. for _, res := range resRAMBytesAllocated {
  535. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  536. if err != nil {
  537. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  538. continue
  539. }
  540. pod, ok := podMap[key]
  541. if !ok {
  542. continue
  543. }
  544. container, err := res.GetString("container")
  545. if err != nil {
  546. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  547. continue
  548. }
  549. if _, ok := pod.Allocations[container]; !ok {
  550. pod.AppendContainer(container)
  551. }
  552. ramBytes := res.Values[0].Value
  553. hours := pod.Allocations[container].Minutes() / 60.0
  554. pod.Allocations[container].RAMByteHours = ramBytes * hours
  555. node, err := res.GetString("node")
  556. if err != nil {
  557. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  558. continue
  559. }
  560. pod.Allocations[container].Properties.SetNode(node)
  561. }
  562. }
  563. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  564. for _, res := range resRAMBytesRequested {
  565. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  566. if err != nil {
  567. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  568. continue
  569. }
  570. pod, ok := podMap[key]
  571. if !ok {
  572. continue
  573. }
  574. container, err := res.GetString("container")
  575. if err != nil {
  576. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  577. continue
  578. }
  579. if _, ok := pod.Allocations[container]; !ok {
  580. pod.AppendContainer(container)
  581. }
  582. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  583. // If RAM allocation is less than requests, set RAMByteHours to
  584. // request level.
  585. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  586. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  587. }
  588. node, err := res.GetString("node")
  589. if err != nil {
  590. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  591. continue
  592. }
  593. pod.Allocations[container].Properties.SetNode(node)
  594. }
  595. }
  596. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  597. for _, res := range resRAMBytesUsed {
  598. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  599. if err != nil {
  600. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  601. continue
  602. }
  603. pod, ok := podMap[key]
  604. if !ok {
  605. continue
  606. }
  607. container, err := res.GetString("container_name")
  608. if err != nil {
  609. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  610. continue
  611. }
  612. if _, ok := pod.Allocations[container]; !ok {
  613. pod.AppendContainer(container)
  614. }
  615. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  616. }
  617. }
  618. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  619. for _, res := range resGPUsRequested {
  620. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  621. if err != nil {
  622. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  623. continue
  624. }
  625. pod, ok := podMap[key]
  626. if !ok {
  627. continue
  628. }
  629. container, err := res.GetString("container")
  630. if err != nil {
  631. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  632. continue
  633. }
  634. if _, ok := pod.Allocations[container]; !ok {
  635. pod.AppendContainer(container)
  636. }
  637. hrs := pod.Allocations[container].Minutes() / 60.0
  638. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  639. }
  640. }
  641. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  642. costPerGiBByCluster := map[string]float64{}
  643. for _, res := range resNetworkCostPerGiB {
  644. cluster, err := res.GetString("cluster_id")
  645. if err != nil {
  646. cluster = env.GetClusterID()
  647. }
  648. costPerGiBByCluster[cluster] = res.Values[0].Value
  649. }
  650. for _, res := range resNetworkGiB {
  651. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  652. if err != nil {
  653. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  654. continue
  655. }
  656. pod, ok := podMap[podKey]
  657. if !ok {
  658. continue
  659. }
  660. for _, alloc := range pod.Allocations {
  661. gib := res.Values[0].Value / float64(len(pod.Allocations))
  662. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  663. alloc.NetworkCost = gib * costPerGiB
  664. }
  665. }
  666. }
  667. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
  668. namespaceLabels := map[string]map[string]string{}
  669. for _, res := range resNamespaceLabels {
  670. namespace, err := res.GetString("namespace")
  671. if err != nil {
  672. continue
  673. }
  674. if _, ok := namespaceLabels[namespace]; !ok {
  675. namespaceLabels[namespace] = map[string]string{}
  676. }
  677. for k, l := range res.GetLabels() {
  678. namespaceLabels[namespace][k] = l
  679. }
  680. }
  681. return namespaceLabels
  682. }
  683. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  684. podLabels := map[podKey]map[string]string{}
  685. for _, res := range resPodLabels {
  686. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  687. if err != nil {
  688. continue
  689. }
  690. if _, ok := podLabels[podKey]; !ok {
  691. podLabels[podKey] = map[string]string{}
  692. }
  693. for k, l := range res.GetLabels() {
  694. podLabels[podKey][k] = l
  695. }
  696. }
  697. return podLabels
  698. }
  699. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  700. namespaceAnnotations := map[string]map[string]string{}
  701. for _, res := range resNamespaceAnnotations {
  702. namespace, err := res.GetString("namespace")
  703. if err != nil {
  704. continue
  705. }
  706. if _, ok := namespaceAnnotations[namespace]; !ok {
  707. namespaceAnnotations[namespace] = map[string]string{}
  708. }
  709. for k, l := range res.GetAnnotations() {
  710. namespaceAnnotations[namespace][k] = l
  711. }
  712. }
  713. return namespaceAnnotations
  714. }
  715. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  716. podAnnotations := map[podKey]map[string]string{}
  717. for _, res := range resPodAnnotations {
  718. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  719. if err != nil {
  720. continue
  721. }
  722. if _, ok := podAnnotations[podKey]; !ok {
  723. podAnnotations[podKey] = map[string]string{}
  724. }
  725. for k, l := range res.GetAnnotations() {
  726. podAnnotations[podKey][k] = l
  727. }
  728. }
  729. return podAnnotations
  730. }
  731. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
  732. for key, pod := range podMap {
  733. for _, alloc := range pod.Allocations {
  734. allocLabels, err := alloc.Properties.GetLabels()
  735. if err != nil {
  736. allocLabels = map[string]string{}
  737. }
  738. // Apply namespace labels first, then pod labels so that pod labels
  739. // overwrite namespace labels.
  740. if labels, ok := namespaceLabels[key.Namespace]; ok {
  741. for k, v := range labels {
  742. allocLabels[k] = v
  743. }
  744. }
  745. if labels, ok := podLabels[key]; ok {
  746. for k, v := range labels {
  747. allocLabels[k] = v
  748. }
  749. }
  750. alloc.Properties.SetLabels(allocLabels)
  751. }
  752. }
  753. }
  754. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  755. for key, pod := range podMap {
  756. for _, alloc := range pod.Allocations {
  757. allocAnnotations, err := alloc.Properties.GetAnnotations()
  758. if err != nil {
  759. allocAnnotations = map[string]string{}
  760. }
  761. // Apply namespace annotations first, then pod annotations so that
  762. // pod labels overwrite namespace labels.
  763. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  764. for k, v := range labels {
  765. allocAnnotations[k] = v
  766. }
  767. }
  768. if labels, ok := podAnnotations[key]; ok {
  769. for k, v := range labels {
  770. allocAnnotations[k] = v
  771. }
  772. }
  773. alloc.Properties.SetAnnotations(allocAnnotations)
  774. }
  775. }
  776. }
  777. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  778. serviceLabels := map[serviceKey]map[string]string{}
  779. for _, res := range resServiceLabels {
  780. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  781. if err != nil {
  782. continue
  783. }
  784. if _, ok := serviceLabels[serviceKey]; !ok {
  785. serviceLabels[serviceKey] = map[string]string{}
  786. }
  787. for k, l := range res.GetLabels() {
  788. serviceLabels[serviceKey][k] = l
  789. }
  790. }
  791. // Prune duplicate services. That is, if the same service exists with
  792. // hyphens instead of underscores, keep the one that uses hyphens.
  793. for key := range serviceLabels {
  794. if strings.Contains(key.Service, "_") {
  795. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  796. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  797. if _, ok := serviceLabels[duplicateKey]; ok {
  798. delete(serviceLabels, key)
  799. }
  800. }
  801. }
  802. return serviceLabels
  803. }
  804. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  805. deploymentLabels := map[controllerKey]map[string]string{}
  806. for _, res := range resDeploymentLabels {
  807. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  808. if err != nil {
  809. continue
  810. }
  811. if _, ok := deploymentLabels[controllerKey]; !ok {
  812. deploymentLabels[controllerKey] = map[string]string{}
  813. }
  814. for k, l := range res.GetLabels() {
  815. deploymentLabels[controllerKey][k] = l
  816. }
  817. }
  818. // Prune duplicate deployments. That is, if the same deployment exists with
  819. // hyphens instead of underscores, keep the one that uses hyphens.
  820. for key := range deploymentLabels {
  821. if strings.Contains(key.Controller, "_") {
  822. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  823. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  824. if _, ok := deploymentLabels[duplicateKey]; ok {
  825. delete(deploymentLabels, key)
  826. }
  827. }
  828. }
  829. return deploymentLabels
  830. }
  831. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  832. statefulSetLabels := map[controllerKey]map[string]string{}
  833. for _, res := range resStatefulSetLabels {
  834. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  835. if err != nil {
  836. continue
  837. }
  838. if _, ok := statefulSetLabels[controllerKey]; !ok {
  839. statefulSetLabels[controllerKey] = map[string]string{}
  840. }
  841. for k, l := range res.GetLabels() {
  842. statefulSetLabels[controllerKey][k] = l
  843. }
  844. }
  845. // Prune duplicate stateful sets. That is, if the same stateful set exists
  846. // with hyphens instead of underscores, keep the one that uses hyphens.
  847. for key := range statefulSetLabels {
  848. if strings.Contains(key.Controller, "_") {
  849. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  850. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  851. if _, ok := statefulSetLabels[duplicateKey]; ok {
  852. delete(statefulSetLabels, key)
  853. }
  854. }
  855. }
  856. return statefulSetLabels
  857. }
  858. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  859. podControllerMap := map[podKey]controllerKey{}
  860. // For each controller, turn the labels into a selector and attempt to
  861. // match it with each set of pod labels. A match indicates that the pod
  862. // belongs to the controller.
  863. for cKey, cLabels := range controllerLabels {
  864. selector := labels.Set(cLabels).AsSelectorPreValidated()
  865. for pKey, pLabels := range podLabels {
  866. // If the pod is in a different cluster or namespace, there is
  867. // no need to compare the labels.
  868. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  869. continue
  870. }
  871. podLabelSet := labels.Set(pLabels)
  872. if selector.Matches(podLabelSet) {
  873. if _, ok := podControllerMap[pKey]; ok {
  874. log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  875. }
  876. podControllerMap[pKey] = cKey
  877. }
  878. }
  879. }
  880. return podControllerMap
  881. }
  882. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  883. daemonSetLabels := map[podKey]controllerKey{}
  884. for _, res := range resDaemonSetLabels {
  885. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  886. if err != nil {
  887. continue
  888. }
  889. pod, err := res.GetString("pod")
  890. if err != nil {
  891. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  892. }
  893. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  894. daemonSetLabels[podKey] = controllerKey
  895. }
  896. return daemonSetLabels
  897. }
  898. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  899. jobLabels := map[podKey]controllerKey{}
  900. for _, res := range resJobLabels {
  901. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  902. if err != nil {
  903. continue
  904. }
  905. // Convert the name of Jobs generated by CronJobs to the name of the
  906. // CronJob by stripping the timestamp off the end.
  907. match := isCron.FindStringSubmatch(controllerKey.Controller)
  908. if match != nil {
  909. controllerKey.Controller = match[1]
  910. }
  911. pod, err := res.GetString("pod")
  912. if err != nil {
  913. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  914. }
  915. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  916. jobLabels[podKey] = controllerKey
  917. }
  918. return jobLabels
  919. }
  920. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
  921. podServicesMap := map[podKey][]serviceKey{}
  922. // For each service, turn the labels into a selector and attempt to
  923. // match it with each set of pod labels. A match indicates that the pod
  924. // belongs to the service.
  925. for sKey, sLabels := range serviceLabels {
  926. selector := labels.Set(sLabels).AsSelectorPreValidated()
  927. for pKey, pLabels := range podLabels {
  928. // If the pod is in a different cluster or namespace, there is
  929. // no need to compare the labels.
  930. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  931. continue
  932. }
  933. podLabelSet := labels.Set(pLabels)
  934. if selector.Matches(podLabelSet) {
  935. if _, ok := podServicesMap[pKey]; !ok {
  936. podServicesMap[pKey] = []serviceKey{}
  937. }
  938. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  939. }
  940. }
  941. }
  942. // For each allocation in each pod, attempt to find and apply the list of
  943. // services associated with the allocation's pod.
  944. for key, pod := range podMap {
  945. for _, alloc := range pod.Allocations {
  946. if sKeys, ok := podServicesMap[key]; ok {
  947. services := []string{}
  948. for _, sKey := range sKeys {
  949. services = append(services, sKey.Service)
  950. }
  951. alloc.Properties.SetServices(services)
  952. }
  953. }
  954. }
  955. }
  956. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  957. for key, pod := range podMap {
  958. for _, alloc := range pod.Allocations {
  959. if controllerKey, ok := podControllerMap[key]; ok {
  960. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  961. alloc.Properties.SetController(controllerKey.Controller)
  962. }
  963. }
  964. }
  965. }
  966. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  967. for _, res := range resNodeCostPerCPUHr {
  968. cluster, err := res.GetString("cluster_id")
  969. if err != nil {
  970. cluster = env.GetClusterID()
  971. }
  972. node, err := res.GetString("node")
  973. if err != nil {
  974. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  975. continue
  976. }
  977. instanceType, err := res.GetString("instance_type")
  978. if err != nil {
  979. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  980. continue
  981. }
  982. key := newNodeKey(cluster, node)
  983. if _, ok := nodeMap[key]; !ok {
  984. nodeMap[key] = &NodePricing{
  985. Name: node,
  986. NodeType: instanceType,
  987. }
  988. }
  989. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  990. }
  991. }
  992. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  993. for _, res := range resNodeCostPerRAMGiBHr {
  994. cluster, err := res.GetString("cluster_id")
  995. if err != nil {
  996. cluster = env.GetClusterID()
  997. }
  998. node, err := res.GetString("node")
  999. if err != nil {
  1000. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1001. continue
  1002. }
  1003. instanceType, err := res.GetString("instance_type")
  1004. if err != nil {
  1005. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1006. continue
  1007. }
  1008. key := newNodeKey(cluster, node)
  1009. if _, ok := nodeMap[key]; !ok {
  1010. nodeMap[key] = &NodePricing{
  1011. Name: node,
  1012. NodeType: instanceType,
  1013. }
  1014. }
  1015. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1016. }
  1017. }
  1018. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1019. for _, res := range resNodeCostPerGPUHr {
  1020. cluster, err := res.GetString("cluster_id")
  1021. if err != nil {
  1022. cluster = env.GetClusterID()
  1023. }
  1024. node, err := res.GetString("node")
  1025. if err != nil {
  1026. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1027. continue
  1028. }
  1029. instanceType, err := res.GetString("instance_type")
  1030. if err != nil {
  1031. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1032. continue
  1033. }
  1034. key := newNodeKey(cluster, node)
  1035. if _, ok := nodeMap[key]; !ok {
  1036. nodeMap[key] = &NodePricing{
  1037. Name: node,
  1038. NodeType: instanceType,
  1039. }
  1040. }
  1041. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1042. }
  1043. }
  1044. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1045. for _, res := range resNodeIsSpot {
  1046. cluster, err := res.GetString("cluster_id")
  1047. if err != nil {
  1048. cluster = env.GetClusterID()
  1049. }
  1050. node, err := res.GetString("node")
  1051. if err != nil {
  1052. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1053. continue
  1054. }
  1055. key := newNodeKey(cluster, node)
  1056. if _, ok := nodeMap[key]; !ok {
  1057. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1058. continue
  1059. }
  1060. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1061. }
  1062. }
  1063. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1064. if cm == nil {
  1065. return
  1066. }
  1067. c, err := cm.Provider.GetConfig()
  1068. if err != nil {
  1069. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1070. return
  1071. }
  1072. discount, err := ParsePercentString(c.Discount)
  1073. if err != nil {
  1074. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1075. return
  1076. }
  1077. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1078. if err != nil {
  1079. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1080. return
  1081. }
  1082. for _, node := range nodeMap {
  1083. // TODO GKE Reserved Instances into account
  1084. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1085. node.CostPerCPUHr *= (1.0 - node.Discount)
  1086. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1087. }
  1088. }
  1089. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1090. for _, res := range resPVCostPerGiBHour {
  1091. cluster, err := res.GetString("cluster_id")
  1092. if err != nil {
  1093. cluster = env.GetClusterID()
  1094. }
  1095. name, err := res.GetString("volumename")
  1096. if err != nil {
  1097. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1098. continue
  1099. }
  1100. key := newPVKey(cluster, name)
  1101. pvMap[key] = &PV{
  1102. Cluster: cluster,
  1103. Name: name,
  1104. CostPerGiBHour: res.Values[0].Value,
  1105. }
  1106. }
  1107. }
  1108. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1109. for _, res := range resPVBytes {
  1110. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1111. if err != nil {
  1112. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1113. continue
  1114. }
  1115. if _, ok := pvMap[key]; !ok {
  1116. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1117. continue
  1118. }
  1119. pvMap[key].Bytes = res.Values[0].Value
  1120. }
  1121. }
  1122. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1123. for _, res := range resPVCInfo {
  1124. cluster, err := res.GetString("cluster_id")
  1125. if err != nil {
  1126. cluster = env.GetClusterID()
  1127. }
  1128. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1129. if err != nil {
  1130. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1131. continue
  1132. }
  1133. namespace := values["namespace"]
  1134. name := values["persistentvolumeclaim"]
  1135. volume := values["volumename"]
  1136. storageClass := values["storageclass"]
  1137. pvKey := newPVKey(cluster, volume)
  1138. pvcKey := newPVCKey(cluster, namespace, name)
  1139. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1140. // the PVC was running, respectively. We subtract 1m from pvcStart
  1141. // because this point will actually represent the end of the first
  1142. // minute. We don't subtract from pvcEnd because it already represents
  1143. // the end of the last minute.
  1144. var pvcStart, pvcEnd time.Time
  1145. for _, datum := range res.Values {
  1146. t := time.Unix(int64(datum.Timestamp), 0)
  1147. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1148. pvcStart = t
  1149. }
  1150. if datum.Value > 0 && window.Contains(t) {
  1151. pvcEnd = t
  1152. }
  1153. }
  1154. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1155. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1156. }
  1157. pvcStart = pvcStart.Add(-time.Minute)
  1158. if _, ok := pvMap[pvKey]; !ok {
  1159. continue
  1160. }
  1161. pvMap[pvKey].StorageClass = storageClass
  1162. if _, ok := pvcMap[pvcKey]; !ok {
  1163. pvcMap[pvcKey] = &PVC{}
  1164. }
  1165. pvcMap[pvcKey].Name = name
  1166. pvcMap[pvcKey].Namespace = namespace
  1167. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1168. pvcMap[pvcKey].Start = pvcStart
  1169. pvcMap[pvcKey].End = pvcEnd
  1170. }
  1171. }
  1172. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1173. for _, res := range resPVCBytesRequested {
  1174. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1175. if err != nil {
  1176. continue
  1177. }
  1178. if _, ok := pvcMap[key]; !ok {
  1179. continue
  1180. }
  1181. pvcMap[key].Bytes = res.Values[0].Value
  1182. }
  1183. }
  1184. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1185. for _, res := range resPodPVCAllocation {
  1186. cluster, err := res.GetString("cluster_id")
  1187. if err != nil {
  1188. cluster = env.GetClusterID()
  1189. }
  1190. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1191. if err != nil {
  1192. log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1193. continue
  1194. }
  1195. namespace := values["namespace"]
  1196. pod := values["pod"]
  1197. name := values["persistentvolumeclaim"]
  1198. volume := values["persistentvolume"]
  1199. podKey := newPodKey(cluster, namespace, pod)
  1200. pvKey := newPVKey(cluster, volume)
  1201. pvcKey := newPVCKey(cluster, namespace, name)
  1202. if _, ok := pvMap[pvKey]; !ok {
  1203. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1204. continue
  1205. }
  1206. if _, ok := podPVCMap[podKey]; !ok {
  1207. podPVCMap[podKey] = []*PVC{}
  1208. }
  1209. pvc, ok := pvcMap[pvcKey]
  1210. if !ok {
  1211. log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1212. continue
  1213. }
  1214. count := 1
  1215. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1216. count = len(pod.Allocations)
  1217. } else {
  1218. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1219. }
  1220. pvc.Count = count
  1221. pvc.Mounted = true
  1222. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1223. }
  1224. }
  1225. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1226. unmountedPVBytes := map[string]float64{}
  1227. unmountedPVCost := map[string]float64{}
  1228. for _, pv := range pvMap {
  1229. mounted := false
  1230. for _, pvc := range pvcMap {
  1231. if pvc.Volume == nil {
  1232. continue
  1233. }
  1234. if pvc.Volume == pv {
  1235. mounted = true
  1236. break
  1237. }
  1238. }
  1239. if !mounted {
  1240. gib := pv.Bytes / 1024 / 1024 / 1024
  1241. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1242. cost := pv.CostPerGiBHour * gib * hrs
  1243. unmountedPVCost[pv.Cluster] += cost
  1244. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1245. }
  1246. }
  1247. for cluster, amount := range unmountedPVCost {
  1248. container := kubecost.UnmountedSuffix
  1249. pod := kubecost.UnmountedSuffix
  1250. namespace := kubecost.UnmountedSuffix
  1251. node := ""
  1252. key := newPodKey(cluster, namespace, pod)
  1253. podMap[key] = &Pod{
  1254. Window: window.Clone(),
  1255. Start: *window.Start(),
  1256. End: *window.End(),
  1257. Key: key,
  1258. Allocations: map[string]*kubecost.Allocation{},
  1259. }
  1260. podMap[key].AppendContainer(container)
  1261. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1262. podMap[key].Allocations[container].Properties.SetNode(node)
  1263. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1264. podMap[key].Allocations[container].Properties.SetPod(pod)
  1265. podMap[key].Allocations[container].Properties.SetContainer(container)
  1266. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1267. podMap[key].Allocations[container].PVCost = amount
  1268. }
  1269. }
  1270. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1271. unmountedPVCBytes := map[namespaceKey]float64{}
  1272. unmountedPVCCost := map[namespaceKey]float64{}
  1273. for _, pvc := range pvcMap {
  1274. if !pvc.Mounted && pvc.Volume != nil {
  1275. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1276. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1277. hrs := pvc.Minutes() / 60.0
  1278. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1279. unmountedPVCCost[key] += cost
  1280. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1281. }
  1282. }
  1283. for key, amount := range unmountedPVCCost {
  1284. container := kubecost.UnmountedSuffix
  1285. pod := kubecost.UnmountedSuffix
  1286. namespace := key.Namespace
  1287. node := ""
  1288. cluster := key.Cluster
  1289. podKey := newPodKey(cluster, namespace, pod)
  1290. podMap[podKey] = &Pod{
  1291. Window: window.Clone(),
  1292. Start: *window.Start(),
  1293. End: *window.End(),
  1294. Key: podKey,
  1295. Allocations: map[string]*kubecost.Allocation{},
  1296. }
  1297. podMap[podKey].AppendContainer(container)
  1298. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1299. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1300. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1301. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1302. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1303. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1304. podMap[podKey].Allocations[container].PVCost = amount
  1305. }
  1306. }
  1307. // getNodePricing determines node pricing, given a key and a mapping from keys
  1308. // to their NodePricing instances, as well as the custom pricing configuration
  1309. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1310. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1311. // back on custom pricing as a default.
  1312. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1313. // Find the relevant NodePricing, if it exists. If not, substitute the
  1314. // custom NodePricing as a default.
  1315. node, ok := nodeMap[nodeKey]
  1316. if !ok || node == nil {
  1317. if nodeKey.Node != "" {
  1318. log.Warningf("CostModel: failed to find node for %s", nodeKey)
  1319. }
  1320. return cm.getCustomNodePricing(false)
  1321. }
  1322. // If custom pricing is enabled and can be retrieved, override detected
  1323. // node pricing with the custom values.
  1324. customPricingConfig, err := cm.Provider.GetConfig()
  1325. if err != nil {
  1326. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1327. }
  1328. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1329. return cm.getCustomNodePricing(node.Preemptible)
  1330. }
  1331. node.Source = "prometheus"
  1332. // If any of the values are NaN or zero, replace them with the custom
  1333. // values as default.
  1334. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1335. // them as strings like this?
  1336. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1337. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1338. cpuCostStr := customPricingConfig.CPU
  1339. if node.Preemptible {
  1340. cpuCostStr = customPricingConfig.SpotCPU
  1341. }
  1342. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1343. if err != nil {
  1344. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1345. }
  1346. node.CostPerCPUHr = costPerCPUHr
  1347. node.Source += "/customCPU"
  1348. }
  1349. if math.IsNaN(node.CostPerGPUHr) {
  1350. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1351. gpuCostStr := customPricingConfig.GPU
  1352. if node.Preemptible {
  1353. gpuCostStr = customPricingConfig.SpotGPU
  1354. }
  1355. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1356. if err != nil {
  1357. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1358. }
  1359. node.CostPerGPUHr = costPerGPUHr
  1360. node.Source += "/customGPU"
  1361. }
  1362. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1363. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1364. ramCostStr := customPricingConfig.RAM
  1365. if node.Preemptible {
  1366. ramCostStr = customPricingConfig.SpotRAM
  1367. }
  1368. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1369. if err != nil {
  1370. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1371. }
  1372. node.CostPerRAMGiBHr = costPerRAMHr
  1373. node.Source += "/customRAM"
  1374. }
  1375. return node
  1376. }
  1377. // getCustomNodePricing converts the CostModel's configured custom pricing
  1378. // values into a NodePricing instance.
  1379. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1380. customPricingConfig, err := cm.Provider.GetConfig()
  1381. if err != nil {
  1382. return nil
  1383. }
  1384. cpuCostStr := customPricingConfig.CPU
  1385. gpuCostStr := customPricingConfig.GPU
  1386. ramCostStr := customPricingConfig.RAM
  1387. if spot {
  1388. cpuCostStr = customPricingConfig.SpotCPU
  1389. gpuCostStr = customPricingConfig.SpotGPU
  1390. ramCostStr = customPricingConfig.SpotRAM
  1391. }
  1392. node := &NodePricing{Source: "custom"}
  1393. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1394. if err != nil {
  1395. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1396. }
  1397. node.CostPerCPUHr = costPerCPUHr
  1398. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1399. if err != nil {
  1400. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1401. }
  1402. node.CostPerGPUHr = costPerGPUHr
  1403. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1404. if err != nil {
  1405. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1406. }
  1407. node.CostPerRAMGiBHr = costPerRAMHr
  1408. return node
  1409. }
  1410. // NodePricing describes the resource costs associated with a given node, as
  1411. // well as the source of the information (e.g. prometheus, custom)
  1412. type NodePricing struct {
  1413. Name string
  1414. NodeType string
  1415. Preemptible bool
  1416. CostPerCPUHr float64
  1417. CostPerRAMGiBHr float64
  1418. CostPerGPUHr float64
  1419. Discount float64
  1420. Source string
  1421. }
  1422. // Pod describes a running pod's start and end time within a Window and
  1423. // all the Allocations (i.e. containers) contained within it.
  1424. type Pod struct {
  1425. Window kubecost.Window
  1426. Start time.Time
  1427. End time.Time
  1428. Key podKey
  1429. Allocations map[string]*kubecost.Allocation
  1430. }
  1431. // AppendContainer adds an entry for the given container name to the Pod.
  1432. func (p Pod) AppendContainer(container string) {
  1433. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1434. alloc := &kubecost.Allocation{
  1435. Name: name,
  1436. Properties: kubecost.Properties{},
  1437. Window: p.Window.Clone(),
  1438. Start: p.Start,
  1439. End: p.End,
  1440. }
  1441. alloc.Properties.SetContainer(container)
  1442. alloc.Properties.SetPod(p.Key.Pod)
  1443. alloc.Properties.SetNamespace(p.Key.Namespace)
  1444. alloc.Properties.SetCluster(p.Key.Cluster)
  1445. p.Allocations[container] = alloc
  1446. }
  1447. // PVC describes a PersistentVolumeClaim
  1448. // TODO:CLEANUP move to pkg/kubecost?
  1449. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1450. type PVC struct {
  1451. Bytes float64 `json:"bytes"`
  1452. Count int `json:"count"`
  1453. Name string `json:"name"`
  1454. Cluster string `json:"cluster"`
  1455. Namespace string `json:"namespace"`
  1456. Volume *PV `json:"persistentVolume"`
  1457. Mounted bool `json:"mounted"`
  1458. Start time.Time `json:"start"`
  1459. End time.Time `json:"end"`
  1460. }
  1461. // Cost computes the cumulative cost of the PVC
  1462. func (pvc *PVC) Cost() float64 {
  1463. if pvc == nil || pvc.Volume == nil {
  1464. return 0.0
  1465. }
  1466. gib := pvc.Bytes / 1024 / 1024 / 1024
  1467. hrs := pvc.Minutes() / 60.0
  1468. return pvc.Volume.CostPerGiBHour * gib * hrs
  1469. }
  1470. // Minutes computes the number of minutes over which the PVC is defined
  1471. func (pvc *PVC) Minutes() float64 {
  1472. if pvc == nil {
  1473. return 0.0
  1474. }
  1475. return pvc.End.Sub(pvc.Start).Minutes()
  1476. }
  1477. // String returns a string representation of the PVC
  1478. func (pvc *PVC) String() string {
  1479. if pvc == nil {
  1480. return "<nil>"
  1481. }
  1482. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1483. }
  1484. // PV describes a PersistentVolume
  1485. // TODO:CLEANUP move to pkg/kubecost?
  1486. type PV struct {
  1487. Bytes float64 `json:"bytes"`
  1488. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1489. Cluster string `json:"cluster"`
  1490. Name string `json:"name"`
  1491. StorageClass string `json:"storageClass"`
  1492. }
  1493. // String returns a string representation of the PV
  1494. func (pv *PV) String() string {
  1495. if pv == nil {
  1496. return "<nil>"
  1497. }
  1498. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1499. }