allocation.go 85 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/util/timeutil"
  9. "github.com/kubecost/cost-model/pkg/cloud"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/kubecost"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  25. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  27. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  28. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  29. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s])`
  32. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  33. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
  34. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  35. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
  36. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
  37. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  38. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
  39. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  40. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
  41. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  42. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
  43. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  44. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  45. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s])`
  46. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s])`
  47. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s])`
  48. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s])`
  49. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s])`
  50. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s])`
  51. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s])`
  52. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
  53. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
  54. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
  55. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
  56. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
  57. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
  58. )
  59. // This is a bit of a hack to work around garbage data from cadvisor
  60. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  61. const MAX_CPU_CAP = 512
  62. // CanCompute should return true if CostModel can act as a valid source for the
  63. // given time range. In the case of CostModel we want to attempt to compute as
  64. // long as the range starts in the past. If the CostModel ends up not having
  65. // data to match, that's okay, and should be communicated with an error
  66. // response from ComputeAllocation.
  67. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  68. return start.Before(time.Now())
  69. }
  70. // Name returns the name of the Source
  71. func (cm *CostModel) Name() string {
  72. return "CostModel"
  73. }
  74. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  75. // for the window defined by the given start and end times. The Allocations
  76. // returned are unaggregated (i.e. down to the container level).
  77. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  78. // If the duration is short enough, compute the AllocationSet directly
  79. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  80. return cm.computeAllocation(start, end, resolution)
  81. }
  82. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  83. // query for maximum-sized AllocationSets, collect them, and accumulate.
  84. // s and e track the coverage of the entire given window over multiple
  85. // internal queries.
  86. s, e := start, start
  87. // Collect AllocationSets in a range, then accumulate
  88. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  89. asr := kubecost.NewAllocationSetRange()
  90. for e.Before(end) {
  91. // By default, query for the full remaining duration. But do not let
  92. // any individual query duration exceed the configured max Prometheus
  93. // query duration.
  94. duration := end.Sub(e)
  95. if duration > cm.MaxPrometheusQueryDuration {
  96. duration = cm.MaxPrometheusQueryDuration
  97. }
  98. // Set start and end parameters (s, e) for next individual computation.
  99. e = s.Add(duration)
  100. // Compute the individual AllocationSet for just (s, e)
  101. as, err := cm.computeAllocation(s, e, resolution)
  102. if err != nil {
  103. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
  104. }
  105. // Append to the range
  106. asr.Append(as)
  107. // Set s equal to e to set up the next query, if one exists.
  108. s = e
  109. }
  110. // Populate annotations, labels, and services on each Allocation. This is
  111. // necessary because Properties.Intersection does not propagate any values
  112. // stored in maps or slices for performance reasons. In this case, however,
  113. // it is both acceptable and necessary to do so.
  114. allocationAnnotations := map[string]map[string]string{}
  115. allocationLabels := map[string]map[string]string{}
  116. allocationServices := map[string]map[string]bool{}
  117. // Also record errors and warnings, then append them to the results later.
  118. errors := []string{}
  119. warnings := []string{}
  120. asr.Each(func(i int, as *kubecost.AllocationSet) {
  121. as.Each(func(k string, a *kubecost.Allocation) {
  122. if len(a.Properties.Annotations) > 0 {
  123. if _, ok := allocationAnnotations[k]; !ok {
  124. allocationAnnotations[k] = map[string]string{}
  125. }
  126. for name, val := range a.Properties.Annotations {
  127. allocationAnnotations[k][name] = val
  128. }
  129. }
  130. if len(a.Properties.Labels) > 0 {
  131. if _, ok := allocationLabels[k]; !ok {
  132. allocationLabels[k] = map[string]string{}
  133. }
  134. for name, val := range a.Properties.Labels {
  135. allocationLabels[k][name] = val
  136. }
  137. }
  138. if len(a.Properties.Services) > 0 {
  139. if _, ok := allocationServices[k]; !ok {
  140. allocationServices[k] = map[string]bool{}
  141. }
  142. for _, val := range a.Properties.Services {
  143. allocationServices[k][val] = true
  144. }
  145. }
  146. })
  147. errors = append(errors, as.Errors...)
  148. warnings = append(warnings, as.Warnings...)
  149. })
  150. // Accumulate to yield the result AllocationSet. After this step, we will
  151. // be nearly complete, but without the raw allocation data, which must be
  152. // recomputed.
  153. result, err := asr.Accumulate()
  154. if err != nil {
  155. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
  156. }
  157. // Apply the annotations, labels, and services to the post-accumulation
  158. // results. (See above for why this is necessary.)
  159. result.Each(func(k string, a *kubecost.Allocation) {
  160. if annotations, ok := allocationAnnotations[k]; ok {
  161. a.Properties.Annotations = annotations
  162. }
  163. if labels, ok := allocationLabels[k]; ok {
  164. a.Properties.Labels = labels
  165. }
  166. if services, ok := allocationServices[k]; ok {
  167. a.Properties.Services = []string{}
  168. for s := range services {
  169. a.Properties.Services = append(a.Properties.Services, s)
  170. }
  171. }
  172. // Expand the Window of all Allocations within the AllocationSet
  173. // to match the Window of the AllocationSet, which gets expanded
  174. // at the end of this function.
  175. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  176. })
  177. // Maintain RAM and CPU max usage values by iterating over the range,
  178. // computing maximums on a rolling basis, and setting on the result set.
  179. asr.Each(func(i int, as *kubecost.AllocationSet) {
  180. as.Each(func(key string, alloc *kubecost.Allocation) {
  181. resultAlloc := result.Get(key)
  182. if resultAlloc == nil {
  183. return
  184. }
  185. if resultAlloc.RawAllocationOnly == nil {
  186. resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
  187. }
  188. if alloc.RawAllocationOnly == nil {
  189. // This will happen inevitably for unmounted disks, but should
  190. // ideally not happen for any allocation with CPU and RAM data.
  191. if !alloc.IsUnmounted() {
  192. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  193. }
  194. return
  195. }
  196. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  197. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  198. }
  199. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  200. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  201. }
  202. })
  203. })
  204. // Expand the window to match the queried time range.
  205. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  206. // Append errors and warnings
  207. result.Errors = errors
  208. result.Warnings = warnings
  209. return result, nil
  210. }
  211. func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  212. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  213. // 2. Run and apply the results of the remaining queries to
  214. // 3. Build out AllocationSet from completed Pod map
  215. // Create a window spanning the requested query
  216. window := kubecost.NewWindow(&start, &end)
  217. // Create an empty AllocationSet. For safety, in the case of an error, we
  218. // should prefer to return this empty set with the error. (In the case of
  219. // no error, of course we populate the set and return it.)
  220. allocSet := kubecost.NewAllocationSet(start, end)
  221. // (1) Build out Pod map
  222. // Build out a map of Allocations as a mapping from pod-to-container-to-
  223. // underlying-Allocation instance, starting with (start, end) so that we
  224. // begin with minutes, from which we compute resource allocation and cost
  225. // totals from measured rate data.
  226. podMap := map[podKey]*Pod{}
  227. // clusterStarts and clusterEnds record the earliest start and latest end
  228. // times, respectively, on a cluster-basis. These are used for unmounted
  229. // PVs and other "virtual" Allocations so that minutes are maximally
  230. // accurate during start-up or spin-down of a cluster
  231. clusterStart := map[string]time.Time{}
  232. clusterEnd := map[string]time.Time{}
  233. // TODO:CLEANUP remove "max batch" idea and clusterStart/End
  234. cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd)
  235. // (2) Run and apply remaining queries
  236. // Query for the duration between start and end
  237. durStr := timeutil.DurationString(end.Sub(start))
  238. if durStr == "" {
  239. return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  240. }
  241. // Convert resolution duration to a query-ready string
  242. resStr := timeutil.DurationString(resolution)
  243. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  244. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
  245. resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
  246. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
  247. resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
  248. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
  249. resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
  250. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
  251. resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
  252. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
  253. resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
  254. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
  255. resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
  256. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
  257. resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
  258. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
  259. resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
  260. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
  261. resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
  262. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
  263. resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
  264. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
  265. resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  266. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
  267. resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  268. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
  269. resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  270. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
  271. resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
  272. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
  273. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
  274. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
  275. resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
  276. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
  277. resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
  278. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
  279. resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
  280. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
  281. resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  282. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
  283. resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
  284. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
  285. resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
  286. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
  287. resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
  288. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
  289. resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  290. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
  291. resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
  292. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
  293. resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  294. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
  295. resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
  296. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
  297. resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  298. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
  299. resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
  300. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
  301. resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
  302. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
  303. resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
  304. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
  305. resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
  306. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
  307. resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
  308. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
  309. resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
  310. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
  311. resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
  312. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
  313. resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
  314. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
  315. resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  316. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
  317. resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  318. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
  319. resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
  320. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
  321. resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
  322. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
  323. resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
  324. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  325. resCPURequests, _ := resChCPURequests.Await()
  326. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  327. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  328. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  329. resRAMRequests, _ := resChRAMRequests.Await()
  330. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  331. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  332. resGPUsRequested, _ := resChGPUsRequested.Await()
  333. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  334. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  335. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  336. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  337. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  338. resPVBytes, _ := resChPVBytes.Await()
  339. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  340. resPVCInfo, _ := resChPVCInfo.Await()
  341. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  342. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  343. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  344. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  345. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  346. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  347. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  348. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  349. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  350. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  351. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  352. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  353. resPodLabels, _ := resChPodLabels.Await()
  354. resPodAnnotations, _ := resChPodAnnotations.Await()
  355. resServiceLabels, _ := resChServiceLabels.Await()
  356. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  357. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  358. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  359. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  360. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  361. resJobLabels, _ := resChJobLabels.Await()
  362. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  363. resLBActiveMins, _ := resChLBActiveMins.Await()
  364. if ctx.HasErrors() {
  365. for _, err := range ctx.Errors() {
  366. log.Errorf("CostModel.ComputeAllocation: %s", err)
  367. }
  368. return allocSet, ctx.ErrorCollection()
  369. }
  370. // We choose to apply allocation before requests in the cases of RAM and
  371. // CPU so that we can assert that allocation should always be greater than
  372. // or equal to request.
  373. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  374. applyCPUCoresRequested(podMap, resCPURequests)
  375. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  376. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  377. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  378. applyRAMBytesRequested(podMap, resRAMRequests)
  379. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  380. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  381. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated)
  382. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes)
  383. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  384. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  385. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  386. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  387. podLabels := resToPodLabels(resPodLabels)
  388. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  389. podAnnotations := resToPodAnnotations(resPodAnnotations)
  390. applyLabels(podMap, namespaceLabels, podLabels)
  391. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  392. serviceLabels := getServiceLabels(resServiceLabels)
  393. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  394. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  395. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  396. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  397. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  398. podJobMap := resToPodJobMap(resJobLabels)
  399. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners)
  400. applyControllersToPods(podMap, podDeploymentMap)
  401. applyControllersToPods(podMap, podStatefulSetMap)
  402. applyControllersToPods(podMap, podDaemonSetMap)
  403. applyControllersToPods(podMap, podJobMap)
  404. applyControllersToPods(podMap, podReplicaSetMap)
  405. // TODO breakdown network costs?
  406. // Build out a map of Nodes with resource costs, discounts, and node types
  407. // for converting resource allocation data to cumulative costs.
  408. nodeMap := map[nodeKey]*NodePricing{}
  409. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  410. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  411. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  412. applyNodeSpot(nodeMap, resNodeIsSpot)
  413. applyNodeDiscount(nodeMap, cm)
  414. // Build out the map of all PVs with class, size and cost-per-hour.
  415. // Note: this does not record time running, which we may want to
  416. // include later for increased PV precision. (As long as the PV has
  417. // a PVC, we get time running there, so this is only inaccurate
  418. // for short-lived, unmounted PVs.)
  419. pvMap := map[pvKey]*PV{}
  420. buildPVMap(pvMap, resPVCostPerGiBHour)
  421. applyPVBytes(pvMap, resPVBytes)
  422. // Build out the map of all PVCs with time running, bytes requested,
  423. // and connect to the correct PV from pvMap. (If no PV exists, that
  424. // is noted, but does not result in any allocation/cost.)
  425. pvcMap := map[pvcKey]*PVC{}
  426. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  427. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  428. // Build out the relationships of pods to their PVCs. This step
  429. // populates the PVC.Count field so that PVC allocation can be
  430. // split appropriately among each pod's container allocation.
  431. podPVCMap := map[podKey][]*PVC{}
  432. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  433. // Because PVCs can be shared among pods, the respective PV cost
  434. // needs to be evenly distributed to those pods based on time
  435. // running, as well as the amount of time the PVC was shared.
  436. // Build a relation between every PVC to the pods that mount it
  437. // and a window representing the interval during which they
  438. // were associated.
  439. pvcPodIntervalMap := make(map[pvcKey]map[podKey]kubecost.Window)
  440. for _, pod := range podMap {
  441. for _, alloc := range pod.Allocations {
  442. cluster := alloc.Properties.Cluster
  443. namespace := alloc.Properties.Namespace
  444. pod := alloc.Properties.Pod
  445. thisPodKey := newPodKey(cluster, namespace, pod)
  446. if pvcs, ok := podPVCMap[thisPodKey]; ok {
  447. for _, pvc := range pvcs {
  448. // Determine the (start, end) of the relationship between the
  449. // given PVC and the associated Allocation so that a precise
  450. // number of hours can be used to compute cumulative cost.
  451. s, e := alloc.Start, alloc.End
  452. if pvc.Start.After(alloc.Start) {
  453. s = pvc.Start
  454. }
  455. if pvc.End.Before(alloc.End) {
  456. e = pvc.End
  457. }
  458. thisPVCKey := newPVCKey(cluster, namespace, pvc.Name)
  459. if pvcPodIntervalMap[thisPVCKey] == nil {
  460. pvcPodIntervalMap[thisPVCKey] = make(map[podKey]kubecost.Window)
  461. }
  462. pvcPodIntervalMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
  463. }
  464. }
  465. // We only need to look at one alloc per pod
  466. break
  467. }
  468. }
  469. // Build out a PV price coefficient for each pod with a PVC. Each
  470. // PVC-pod relation needs a coefficient which modifies the PV cost
  471. // such that PV costs can be shared between all pods using that PVC.
  472. sharedPVCCostCoefficientMap := make(map[pvcKey]map[podKey][]CoefficientComponent)
  473. for pvcKey, podIntervalMap := range pvcPodIntervalMap {
  474. // Get single-point intervals from alloc-PVC relation windows.
  475. intervals := getIntervalPointsFromWindows(podIntervalMap)
  476. // Determine coefficients for each PVC-pod relation.
  477. sharedPVCCostCoefficientMap[pvcKey] = getPVCCostCoefficients(intervals, podIntervalMap)
  478. }
  479. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  480. // cluster representing each cluster's unmounted PVs (if necessary).
  481. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  482. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  483. applyLoadBalancersToPods(lbMap, allocsByService)
  484. // (3) Build out AllocationSet from Pod map
  485. for _, pod := range podMap {
  486. for _, alloc := range pod.Allocations {
  487. cluster := alloc.Properties.Cluster
  488. nodeName := alloc.Properties.Node
  489. namespace := alloc.Properties.Namespace
  490. pod := alloc.Properties.Pod
  491. container := alloc.Properties.Container
  492. podKey := newPodKey(cluster, namespace, pod)
  493. nodeKey := newNodeKey(cluster, nodeName)
  494. node := cm.getNodePricing(nodeMap, nodeKey)
  495. alloc.Properties.ProviderID = node.ProviderID
  496. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  497. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  498. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  499. if pvcs, ok := podPVCMap[podKey]; ok {
  500. for _, pvc := range pvcs {
  501. pvcKey := newPVCKey(cluster, namespace, pvc.Name)
  502. s, e := alloc.Start, alloc.End
  503. if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
  504. s, e = *pvcInterval.Start(), *pvcInterval.End()
  505. } else {
  506. log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
  507. }
  508. minutes := e.Sub(s).Minutes()
  509. hrs := minutes / 60.0
  510. count := float64(pvc.Count)
  511. if pvc.Count < 1 {
  512. count = 1
  513. }
  514. gib := pvc.Bytes / 1024 / 1024 / 1024
  515. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  516. // Scale PV cost by PVC sharing coefficient.
  517. if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
  518. cost *= getCoefficientFromComponents(coeffComponents)
  519. } else {
  520. log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
  521. }
  522. // Apply the size and cost of the PV to the allocation, each
  523. // weighted by count (i.e. the number of containers in the pod)
  524. // record the amount of total PVBytes Hours attributable to a given PV
  525. if alloc.PVs == nil {
  526. alloc.PVs = kubecost.PVAllocations{}
  527. }
  528. pvKey := kubecost.PVKey{
  529. Cluster: pvc.Cluster,
  530. Name: pvc.Volume.Name,
  531. }
  532. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  533. ByteHours: pvc.Bytes * hrs / count,
  534. Cost: cost / count,
  535. }
  536. }
  537. }
  538. // Make sure that the name is correct (node may not be present at this
  539. // point due to it missing from queryMinutes) then insert.
  540. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  541. allocSet.Set(alloc)
  542. }
  543. }
  544. return allocSet, nil
  545. }
  546. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  547. // Assumes that window is positive and closed
  548. start, end := *window.Start(), *window.End()
  549. // Convert resolution duration to a query-ready string
  550. resStr := timeutil.DurationString(resolution)
  551. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  552. // Query for (start, end) by (pod, namespace, cluster) over the given
  553. // window, using the given resolution, and if necessary in batches no
  554. // larger than the given maximum batch size. If working in batches, track
  555. // overall progress by starting with (window.start, window.start) and
  556. // querying in batches no larger than maxBatchSize from start-to-end,
  557. // folding each result set into podMap as the results come back.
  558. coverage := kubecost.NewWindow(&start, &start)
  559. numQuery := 1
  560. for coverage.End().Before(end) {
  561. // Determine the (start, end) of the current batch
  562. batchStart := *coverage.End()
  563. batchEnd := coverage.End().Add(maxBatchSize)
  564. if batchEnd.After(end) {
  565. batchEnd = end
  566. }
  567. var resPods []*prom.QueryResult
  568. var err error
  569. maxTries := 3
  570. numTries := 0
  571. for resPods == nil && numTries < maxTries {
  572. numTries++
  573. // Query for the duration between start and end
  574. durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
  575. if durStr == "" {
  576. // Negative duration, so set empty results and don't query
  577. resPods = []*prom.QueryResult{}
  578. err = nil
  579. break
  580. }
  581. // Submit and profile query
  582. queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr)
  583. queryProfile := time.Now()
  584. resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
  585. if err != nil {
  586. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  587. resPods = nil
  588. }
  589. }
  590. if err != nil {
  591. return err
  592. }
  593. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  594. coverage = coverage.ExpandEnd(batchEnd)
  595. numQuery++
  596. }
  597. return nil
  598. }
  599. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  600. for _, res := range resPods {
  601. if len(res.Values) == 0 {
  602. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  603. continue
  604. }
  605. cluster, err := res.GetString(env.GetPromClusterLabel())
  606. if err != nil {
  607. cluster = env.GetClusterID()
  608. }
  609. labels, err := res.GetStrings("namespace", "pod")
  610. if err != nil {
  611. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  612. continue
  613. }
  614. namespace := labels["namespace"]
  615. pod := labels["pod"]
  616. key := newPodKey(cluster, namespace, pod)
  617. // allocStart and allocEnd are the timestamps of the first and last
  618. // minutes the pod was running, respectively. We subtract one resolution
  619. // from allocStart because this point will actually represent the end
  620. // of the first minute. We don't subtract from allocEnd because it
  621. // already represents the end of the last minute.
  622. var allocStart, allocEnd time.Time
  623. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  624. for _, datum := range res.Values {
  625. t := time.Unix(int64(datum.Timestamp), 0)
  626. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  627. // Set the start timestamp to the earliest non-zero timestamp
  628. allocStart = t
  629. // Record adjustment coefficient, i.e. the portion of the start
  630. // timestamp to "ignore". That is, sometimes the value will be
  631. // 0.5, meaning that we should discount the time running by
  632. // half of the resolution the timestamp stands for.
  633. startAdjustmentCoeff = (1.0 - datum.Value)
  634. }
  635. if datum.Value > 0 && window.Contains(t) {
  636. // Set the end timestamp to the latest non-zero timestamp
  637. allocEnd = t
  638. // Record adjustment coefficient, i.e. the portion of the end
  639. // timestamp to "ignore". (See explanation above for start.)
  640. endAdjustmentCoeff = (1.0 - datum.Value)
  641. }
  642. }
  643. if allocStart.IsZero() || allocEnd.IsZero() {
  644. continue
  645. }
  646. // Adjust timestamps according to the resolution and the adjustment
  647. // coefficients, as described above. That is, count the start timestamp
  648. // from the beginning of the resolution, not the end. Then "reduce" the
  649. // start and end by the correct amount, in the case that the "running"
  650. // value of the first or last timestamp was not a full 1.0.
  651. allocStart = allocStart.Add(-resolution)
  652. // Note: the *100 and /100 are necessary because Duration is an int, so
  653. // 0.5, for instance, will be truncated, resulting in no adjustment.
  654. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  655. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  656. // Ensure that the allocStart is always within the window, adjusting
  657. // for the occasions where start falls 1m before the query window.
  658. // NOTE: window here will always be closed (so no need to nil check
  659. // "start").
  660. // TODO:CLEANUP revisit query methodology to figure out why this is
  661. // happening on occasion
  662. if allocStart.Before(*window.Start()) {
  663. allocStart = *window.Start()
  664. }
  665. // If there is only one point with a value <= 0.5 that the start and
  666. // end timestamps both share, then we will enter this case because at
  667. // least half of a resolution will be subtracted from both the start
  668. // and the end. If that is the case, then add back half of each side
  669. // so that the pod is said to run for half a resolution total.
  670. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  671. // end up with allocEnd == allocStart and each coeff == 0.5. In
  672. // that case, add 0.25m to each side, resulting in 0.5m duration.
  673. if !allocEnd.After(allocStart) {
  674. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  675. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  676. }
  677. // Ensure that the allocEnf is always within the window, adjusting
  678. // for the occasions where end falls 1m after the query window. This
  679. // has not ever happened, but is symmetrical with the start check
  680. // above.
  681. // NOTE: window here will always be closed (so no need to nil check
  682. // "end").
  683. // TODO:CLEANUP revisit query methodology to figure out why this is
  684. // happening on occasion
  685. if allocEnd.After(*window.End()) {
  686. allocEnd = *window.End()
  687. }
  688. // Set start if unset or this datum's start time is earlier than the
  689. // current earliest time.
  690. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  691. clusterStart[cluster] = allocStart
  692. }
  693. // Set end if unset or this datum's end time is later than the
  694. // current latest time.
  695. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  696. clusterEnd[cluster] = allocEnd
  697. }
  698. if pod, ok := podMap[key]; ok {
  699. // Pod has already been recorded, so update it accordingly
  700. if allocStart.Before(pod.Start) {
  701. pod.Start = allocStart
  702. }
  703. if allocEnd.After(pod.End) {
  704. pod.End = allocEnd
  705. }
  706. } else {
  707. // Pod has not been recorded yet, so insert it
  708. podMap[key] = &Pod{
  709. Window: window.Clone(),
  710. Start: allocStart,
  711. End: allocEnd,
  712. Key: key,
  713. Allocations: map[string]*kubecost.Allocation{},
  714. }
  715. }
  716. }
  717. }
  718. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  719. for _, res := range resCPUCoresAllocated {
  720. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  721. if err != nil {
  722. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  723. continue
  724. }
  725. pod, ok := podMap[key]
  726. if !ok {
  727. continue
  728. }
  729. container, err := res.GetString("container")
  730. if err != nil {
  731. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  732. continue
  733. }
  734. if _, ok := pod.Allocations[container]; !ok {
  735. pod.AppendContainer(container)
  736. }
  737. cpuCores := res.Values[0].Value
  738. if cpuCores > MAX_CPU_CAP {
  739. log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  740. cpuCores = 0.0
  741. }
  742. hours := pod.Allocations[container].Minutes() / 60.0
  743. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  744. node, err := res.GetString("node")
  745. if err != nil {
  746. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  747. continue
  748. }
  749. pod.Allocations[container].Properties.Node = node
  750. }
  751. }
  752. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  753. for _, res := range resCPUCoresRequested {
  754. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  755. if err != nil {
  756. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  757. continue
  758. }
  759. pod, ok := podMap[key]
  760. if !ok {
  761. continue
  762. }
  763. container, err := res.GetString("container")
  764. if err != nil {
  765. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  766. continue
  767. }
  768. if _, ok := pod.Allocations[container]; !ok {
  769. pod.AppendContainer(container)
  770. }
  771. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  772. // If CPU allocation is less than requests, set CPUCoreHours to
  773. // request level.
  774. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  775. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  776. }
  777. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  778. log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  779. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  780. }
  781. node, err := res.GetString("node")
  782. if err != nil {
  783. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  784. continue
  785. }
  786. pod.Allocations[container].Properties.Node = node
  787. }
  788. }
  789. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  790. for _, res := range resCPUCoresUsedAvg {
  791. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  792. if err != nil {
  793. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  794. continue
  795. }
  796. pod, ok := podMap[key]
  797. if !ok {
  798. continue
  799. }
  800. container, err := res.GetString("container")
  801. if container == "" || err != nil {
  802. container, err = res.GetString("container_name")
  803. if err != nil {
  804. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  805. continue
  806. }
  807. }
  808. if _, ok := pod.Allocations[container]; !ok {
  809. pod.AppendContainer(container)
  810. }
  811. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  812. if res.Values[0].Value > MAX_CPU_CAP {
  813. log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  814. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  815. }
  816. }
  817. }
  818. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  819. for _, res := range resCPUCoresUsedMax {
  820. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  821. if err != nil {
  822. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  823. continue
  824. }
  825. pod, ok := podMap[key]
  826. if !ok {
  827. continue
  828. }
  829. container, err := res.GetString("container")
  830. if container == "" || err != nil {
  831. container, err = res.GetString("container_name")
  832. if err != nil {
  833. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  834. continue
  835. }
  836. }
  837. if _, ok := pod.Allocations[container]; !ok {
  838. pod.AppendContainer(container)
  839. }
  840. if pod.Allocations[container].RawAllocationOnly == nil {
  841. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  842. CPUCoreUsageMax: res.Values[0].Value,
  843. }
  844. } else {
  845. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  846. }
  847. }
  848. }
  849. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  850. for _, res := range resRAMBytesAllocated {
  851. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  852. if err != nil {
  853. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  854. continue
  855. }
  856. pod, ok := podMap[key]
  857. if !ok {
  858. continue
  859. }
  860. container, err := res.GetString("container")
  861. if err != nil {
  862. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  863. continue
  864. }
  865. if _, ok := pod.Allocations[container]; !ok {
  866. pod.AppendContainer(container)
  867. }
  868. ramBytes := res.Values[0].Value
  869. hours := pod.Allocations[container].Minutes() / 60.0
  870. pod.Allocations[container].RAMByteHours = ramBytes * hours
  871. node, err := res.GetString("node")
  872. if err != nil {
  873. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  874. continue
  875. }
  876. pod.Allocations[container].Properties.Node = node
  877. }
  878. }
  879. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  880. for _, res := range resRAMBytesRequested {
  881. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  882. if err != nil {
  883. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  884. continue
  885. }
  886. pod, ok := podMap[key]
  887. if !ok {
  888. continue
  889. }
  890. container, err := res.GetString("container")
  891. if err != nil {
  892. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  893. continue
  894. }
  895. if _, ok := pod.Allocations[container]; !ok {
  896. pod.AppendContainer(container)
  897. }
  898. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  899. // If RAM allocation is less than requests, set RAMByteHours to
  900. // request level.
  901. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  902. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  903. }
  904. node, err := res.GetString("node")
  905. if err != nil {
  906. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  907. continue
  908. }
  909. pod.Allocations[container].Properties.Node = node
  910. }
  911. }
  912. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  913. for _, res := range resRAMBytesUsedAvg {
  914. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  915. if err != nil {
  916. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  917. continue
  918. }
  919. pod, ok := podMap[key]
  920. if !ok {
  921. continue
  922. }
  923. container, err := res.GetString("container")
  924. if container == "" || err != nil {
  925. container, err = res.GetString("container_name")
  926. if err != nil {
  927. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  928. continue
  929. }
  930. }
  931. if _, ok := pod.Allocations[container]; !ok {
  932. pod.AppendContainer(container)
  933. }
  934. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  935. }
  936. }
  937. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  938. for _, res := range resRAMBytesUsedMax {
  939. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  940. if err != nil {
  941. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  942. continue
  943. }
  944. pod, ok := podMap[key]
  945. if !ok {
  946. continue
  947. }
  948. container, err := res.GetString("container")
  949. if container == "" || err != nil {
  950. container, err = res.GetString("container_name")
  951. if err != nil {
  952. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  953. continue
  954. }
  955. }
  956. if _, ok := pod.Allocations[container]; !ok {
  957. pod.AppendContainer(container)
  958. }
  959. if pod.Allocations[container].RawAllocationOnly == nil {
  960. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  961. RAMBytesUsageMax: res.Values[0].Value,
  962. }
  963. } else {
  964. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  965. }
  966. }
  967. }
  968. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult) {
  969. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  970. resGPUsRequested = resGPUsAllocated
  971. }
  972. for _, res := range resGPUsRequested {
  973. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  974. if err != nil {
  975. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  976. continue
  977. }
  978. pod, ok := podMap[key]
  979. if !ok {
  980. continue
  981. }
  982. container, err := res.GetString("container")
  983. if err != nil {
  984. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  985. continue
  986. }
  987. if _, ok := pod.Allocations[container]; !ok {
  988. pod.AppendContainer(container)
  989. }
  990. hrs := pod.Allocations[container].Minutes() / 60.0
  991. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  992. }
  993. }
  994. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult) {
  995. for _, res := range resNetworkTransferBytes {
  996. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  997. if err != nil {
  998. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  999. continue
  1000. }
  1001. pod, ok := podMap[podKey]
  1002. if !ok {
  1003. continue
  1004. }
  1005. for _, alloc := range pod.Allocations {
  1006. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations))
  1007. }
  1008. }
  1009. for _, res := range resNetworkReceiveBytes {
  1010. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1011. if err != nil {
  1012. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  1013. continue
  1014. }
  1015. pod, ok := podMap[podKey]
  1016. if !ok {
  1017. continue
  1018. }
  1019. for _, alloc := range pod.Allocations {
  1020. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations))
  1021. }
  1022. }
  1023. }
  1024. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  1025. costPerGiBByCluster := map[string]float64{}
  1026. for _, res := range resNetworkCostPerGiB {
  1027. cluster, err := res.GetString(env.GetPromClusterLabel())
  1028. if err != nil {
  1029. cluster = env.GetClusterID()
  1030. }
  1031. costPerGiBByCluster[cluster] = res.Values[0].Value
  1032. }
  1033. for _, res := range resNetworkGiB {
  1034. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1035. if err != nil {
  1036. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  1037. continue
  1038. }
  1039. pod, ok := podMap[podKey]
  1040. if !ok {
  1041. continue
  1042. }
  1043. for _, alloc := range pod.Allocations {
  1044. gib := res.Values[0].Value / float64(len(pod.Allocations))
  1045. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  1046. alloc.NetworkCost = gib * costPerGiB
  1047. }
  1048. }
  1049. }
  1050. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  1051. namespaceLabels := map[namespaceKey]map[string]string{}
  1052. for _, res := range resNamespaceLabels {
  1053. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  1054. if err != nil {
  1055. continue
  1056. }
  1057. if _, ok := namespaceLabels[nsKey]; !ok {
  1058. namespaceLabels[nsKey] = map[string]string{}
  1059. }
  1060. for k, l := range res.GetLabels() {
  1061. namespaceLabels[nsKey][k] = l
  1062. }
  1063. }
  1064. return namespaceLabels
  1065. }
  1066. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  1067. podLabels := map[podKey]map[string]string{}
  1068. for _, res := range resPodLabels {
  1069. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1070. if err != nil {
  1071. continue
  1072. }
  1073. if _, ok := podLabels[podKey]; !ok {
  1074. podLabels[podKey] = map[string]string{}
  1075. }
  1076. for k, l := range res.GetLabels() {
  1077. podLabels[podKey][k] = l
  1078. }
  1079. }
  1080. return podLabels
  1081. }
  1082. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  1083. namespaceAnnotations := map[string]map[string]string{}
  1084. for _, res := range resNamespaceAnnotations {
  1085. namespace, err := res.GetString("namespace")
  1086. if err != nil {
  1087. continue
  1088. }
  1089. if _, ok := namespaceAnnotations[namespace]; !ok {
  1090. namespaceAnnotations[namespace] = map[string]string{}
  1091. }
  1092. for k, l := range res.GetAnnotations() {
  1093. namespaceAnnotations[namespace][k] = l
  1094. }
  1095. }
  1096. return namespaceAnnotations
  1097. }
  1098. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  1099. podAnnotations := map[podKey]map[string]string{}
  1100. for _, res := range resPodAnnotations {
  1101. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1102. if err != nil {
  1103. continue
  1104. }
  1105. if _, ok := podAnnotations[podKey]; !ok {
  1106. podAnnotations[podKey] = map[string]string{}
  1107. }
  1108. for k, l := range res.GetAnnotations() {
  1109. podAnnotations[podKey][k] = l
  1110. }
  1111. }
  1112. return podAnnotations
  1113. }
  1114. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  1115. for podKey, pod := range podMap {
  1116. for _, alloc := range pod.Allocations {
  1117. allocLabels := alloc.Properties.Labels
  1118. if allocLabels == nil {
  1119. allocLabels = make(map[string]string)
  1120. }
  1121. // Apply namespace labels first, then pod labels so that pod labels
  1122. // overwrite namespace labels.
  1123. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  1124. if labels, ok := namespaceLabels[nsKey]; ok {
  1125. for k, v := range labels {
  1126. allocLabels[k] = v
  1127. }
  1128. }
  1129. if labels, ok := podLabels[podKey]; ok {
  1130. for k, v := range labels {
  1131. allocLabels[k] = v
  1132. }
  1133. }
  1134. alloc.Properties.Labels = allocLabels
  1135. }
  1136. }
  1137. }
  1138. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  1139. for key, pod := range podMap {
  1140. for _, alloc := range pod.Allocations {
  1141. allocAnnotations := alloc.Properties.Annotations
  1142. if allocAnnotations == nil {
  1143. allocAnnotations = make(map[string]string)
  1144. }
  1145. // Apply namespace annotations first, then pod annotations so that
  1146. // pod labels overwrite namespace labels.
  1147. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  1148. for k, v := range labels {
  1149. allocAnnotations[k] = v
  1150. }
  1151. }
  1152. if labels, ok := podAnnotations[key]; ok {
  1153. for k, v := range labels {
  1154. allocAnnotations[k] = v
  1155. }
  1156. }
  1157. alloc.Properties.Annotations = allocAnnotations
  1158. }
  1159. }
  1160. }
  1161. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  1162. serviceLabels := map[serviceKey]map[string]string{}
  1163. for _, res := range resServiceLabels {
  1164. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  1165. if err != nil {
  1166. continue
  1167. }
  1168. if _, ok := serviceLabels[serviceKey]; !ok {
  1169. serviceLabels[serviceKey] = map[string]string{}
  1170. }
  1171. for k, l := range res.GetLabels() {
  1172. serviceLabels[serviceKey][k] = l
  1173. }
  1174. }
  1175. // Prune duplicate services. That is, if the same service exists with
  1176. // hyphens instead of underscores, keep the one that uses hyphens.
  1177. for key := range serviceLabels {
  1178. if strings.Contains(key.Service, "_") {
  1179. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1180. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1181. if _, ok := serviceLabels[duplicateKey]; ok {
  1182. delete(serviceLabels, key)
  1183. }
  1184. }
  1185. }
  1186. return serviceLabels
  1187. }
  1188. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1189. deploymentLabels := map[controllerKey]map[string]string{}
  1190. for _, res := range resDeploymentLabels {
  1191. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1192. if err != nil {
  1193. continue
  1194. }
  1195. if _, ok := deploymentLabels[controllerKey]; !ok {
  1196. deploymentLabels[controllerKey] = map[string]string{}
  1197. }
  1198. for k, l := range res.GetLabels() {
  1199. deploymentLabels[controllerKey][k] = l
  1200. }
  1201. }
  1202. // Prune duplicate deployments. That is, if the same deployment exists with
  1203. // hyphens instead of underscores, keep the one that uses hyphens.
  1204. for key := range deploymentLabels {
  1205. if strings.Contains(key.Controller, "_") {
  1206. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1207. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1208. if _, ok := deploymentLabels[duplicateKey]; ok {
  1209. delete(deploymentLabels, key)
  1210. }
  1211. }
  1212. }
  1213. return deploymentLabels
  1214. }
  1215. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1216. statefulSetLabels := map[controllerKey]map[string]string{}
  1217. for _, res := range resStatefulSetLabels {
  1218. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1219. if err != nil {
  1220. continue
  1221. }
  1222. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1223. statefulSetLabels[controllerKey] = map[string]string{}
  1224. }
  1225. for k, l := range res.GetLabels() {
  1226. statefulSetLabels[controllerKey][k] = l
  1227. }
  1228. }
  1229. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1230. // with hyphens instead of underscores, keep the one that uses hyphens.
  1231. for key := range statefulSetLabels {
  1232. if strings.Contains(key.Controller, "_") {
  1233. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1234. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1235. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1236. delete(statefulSetLabels, key)
  1237. }
  1238. }
  1239. }
  1240. return statefulSetLabels
  1241. }
  1242. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1243. podControllerMap := map[podKey]controllerKey{}
  1244. // For each controller, turn the labels into a selector and attempt to
  1245. // match it with each set of pod labels. A match indicates that the pod
  1246. // belongs to the controller.
  1247. for cKey, cLabels := range controllerLabels {
  1248. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1249. for pKey, pLabels := range podLabels {
  1250. // If the pod is in a different cluster or namespace, there is
  1251. // no need to compare the labels.
  1252. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1253. continue
  1254. }
  1255. podLabelSet := labels.Set(pLabels)
  1256. if selector.Matches(podLabelSet) {
  1257. if _, ok := podControllerMap[pKey]; ok {
  1258. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1259. }
  1260. podControllerMap[pKey] = cKey
  1261. }
  1262. }
  1263. }
  1264. return podControllerMap
  1265. }
  1266. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  1267. daemonSetLabels := map[podKey]controllerKey{}
  1268. for _, res := range resDaemonSetLabels {
  1269. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1270. if err != nil {
  1271. continue
  1272. }
  1273. pod, err := res.GetString("pod")
  1274. if err != nil {
  1275. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1276. }
  1277. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1278. daemonSetLabels[podKey] = controllerKey
  1279. }
  1280. return daemonSetLabels
  1281. }
  1282. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  1283. jobLabels := map[podKey]controllerKey{}
  1284. for _, res := range resJobLabels {
  1285. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1286. if err != nil {
  1287. continue
  1288. }
  1289. // Convert the name of Jobs generated by CronJobs to the name of the
  1290. // CronJob by stripping the timestamp off the end.
  1291. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1292. if match != nil {
  1293. controllerKey.Controller = match[1]
  1294. }
  1295. pod, err := res.GetString("pod")
  1296. if err != nil {
  1297. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1298. }
  1299. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1300. jobLabels[podKey] = controllerKey
  1301. }
  1302. return jobLabels
  1303. }
  1304. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult) map[podKey]controllerKey {
  1305. // Build out set of ReplicaSets that have no owners, themselves, such that
  1306. // the ReplicaSet should be used as the owner of the Pods it controls.
  1307. // (This should exclude, for example, ReplicaSets that are controlled by
  1308. // Deployments, in which case the Deployment should be the Pod's owner.)
  1309. replicaSets := map[controllerKey]struct{}{}
  1310. for _, res := range resReplicaSetsWithoutOwners {
  1311. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1312. if err != nil {
  1313. continue
  1314. }
  1315. replicaSets[controllerKey] = struct{}{}
  1316. }
  1317. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1318. // to not appear in the set of uncontrolled ReplicaSets above.
  1319. podToReplicaSet := map[podKey]controllerKey{}
  1320. for _, res := range resPodsWithReplicaSetOwner {
  1321. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1322. if err != nil {
  1323. continue
  1324. }
  1325. if _, ok := replicaSets[controllerKey]; !ok {
  1326. continue
  1327. }
  1328. pod, err := res.GetString("pod")
  1329. if err != nil {
  1330. log.Warningf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1331. }
  1332. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1333. podToReplicaSet[podKey] = controllerKey
  1334. }
  1335. return podToReplicaSet
  1336. }
  1337. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1338. podServicesMap := map[podKey][]serviceKey{}
  1339. // For each service, turn the labels into a selector and attempt to
  1340. // match it with each set of pod labels. A match indicates that the pod
  1341. // belongs to the service.
  1342. for sKey, sLabels := range serviceLabels {
  1343. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1344. for pKey, pLabels := range podLabels {
  1345. // If the pod is in a different cluster or namespace, there is
  1346. // no need to compare the labels.
  1347. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1348. continue
  1349. }
  1350. podLabelSet := labels.Set(pLabels)
  1351. if selector.Matches(podLabelSet) {
  1352. if _, ok := podServicesMap[pKey]; !ok {
  1353. podServicesMap[pKey] = []serviceKey{}
  1354. }
  1355. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1356. }
  1357. }
  1358. }
  1359. // For each allocation in each pod, attempt to find and apply the list of
  1360. // services associated with the allocation's pod.
  1361. for key, pod := range podMap {
  1362. for _, alloc := range pod.Allocations {
  1363. if sKeys, ok := podServicesMap[key]; ok {
  1364. services := []string{}
  1365. for _, sKey := range sKeys {
  1366. services = append(services, sKey.Service)
  1367. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1368. }
  1369. alloc.Properties.Services = services
  1370. }
  1371. }
  1372. }
  1373. }
  1374. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1375. for key, pod := range podMap {
  1376. for _, alloc := range pod.Allocations {
  1377. if controllerKey, ok := podControllerMap[key]; ok {
  1378. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1379. alloc.Properties.Controller = controllerKey.Controller
  1380. }
  1381. }
  1382. }
  1383. }
  1384. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1385. for _, res := range resNodeCostPerCPUHr {
  1386. cluster, err := res.GetString(env.GetPromClusterLabel())
  1387. if err != nil {
  1388. cluster = env.GetClusterID()
  1389. }
  1390. node, err := res.GetString("node")
  1391. if err != nil {
  1392. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1393. continue
  1394. }
  1395. instanceType, err := res.GetString("instance_type")
  1396. if err != nil {
  1397. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1398. continue
  1399. }
  1400. providerID, err := res.GetString("provider_id")
  1401. if err != nil {
  1402. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1403. continue
  1404. }
  1405. key := newNodeKey(cluster, node)
  1406. if _, ok := nodeMap[key]; !ok {
  1407. nodeMap[key] = &NodePricing{
  1408. Name: node,
  1409. NodeType: instanceType,
  1410. ProviderID: cloud.ParseID(providerID),
  1411. }
  1412. }
  1413. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1414. }
  1415. }
  1416. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1417. for _, res := range resNodeCostPerRAMGiBHr {
  1418. cluster, err := res.GetString(env.GetPromClusterLabel())
  1419. if err != nil {
  1420. cluster = env.GetClusterID()
  1421. }
  1422. node, err := res.GetString("node")
  1423. if err != nil {
  1424. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1425. continue
  1426. }
  1427. instanceType, err := res.GetString("instance_type")
  1428. if err != nil {
  1429. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1430. continue
  1431. }
  1432. providerID, err := res.GetString("provider_id")
  1433. if err != nil {
  1434. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1435. continue
  1436. }
  1437. key := newNodeKey(cluster, node)
  1438. if _, ok := nodeMap[key]; !ok {
  1439. nodeMap[key] = &NodePricing{
  1440. Name: node,
  1441. NodeType: instanceType,
  1442. ProviderID: cloud.ParseID(providerID),
  1443. }
  1444. }
  1445. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1446. }
  1447. }
  1448. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1449. for _, res := range resNodeCostPerGPUHr {
  1450. cluster, err := res.GetString(env.GetPromClusterLabel())
  1451. if err != nil {
  1452. cluster = env.GetClusterID()
  1453. }
  1454. node, err := res.GetString("node")
  1455. if err != nil {
  1456. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1457. continue
  1458. }
  1459. instanceType, err := res.GetString("instance_type")
  1460. if err != nil {
  1461. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1462. continue
  1463. }
  1464. providerID, err := res.GetString("provider_id")
  1465. if err != nil {
  1466. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1467. continue
  1468. }
  1469. key := newNodeKey(cluster, node)
  1470. if _, ok := nodeMap[key]; !ok {
  1471. nodeMap[key] = &NodePricing{
  1472. Name: node,
  1473. NodeType: instanceType,
  1474. ProviderID: cloud.ParseID(providerID),
  1475. }
  1476. }
  1477. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1478. }
  1479. }
  1480. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1481. for _, res := range resNodeIsSpot {
  1482. cluster, err := res.GetString(env.GetPromClusterLabel())
  1483. if err != nil {
  1484. cluster = env.GetClusterID()
  1485. }
  1486. node, err := res.GetString("node")
  1487. if err != nil {
  1488. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1489. continue
  1490. }
  1491. key := newNodeKey(cluster, node)
  1492. if _, ok := nodeMap[key]; !ok {
  1493. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1494. continue
  1495. }
  1496. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1497. }
  1498. }
  1499. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1500. if cm == nil {
  1501. return
  1502. }
  1503. c, err := cm.Provider.GetConfig()
  1504. if err != nil {
  1505. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1506. return
  1507. }
  1508. discount, err := ParsePercentString(c.Discount)
  1509. if err != nil {
  1510. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1511. return
  1512. }
  1513. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1514. if err != nil {
  1515. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1516. return
  1517. }
  1518. for _, node := range nodeMap {
  1519. // TODO GKE Reserved Instances into account
  1520. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1521. node.CostPerCPUHr *= (1.0 - node.Discount)
  1522. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1523. }
  1524. }
  1525. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1526. for _, res := range resPVCostPerGiBHour {
  1527. cluster, err := res.GetString(env.GetPromClusterLabel())
  1528. if err != nil {
  1529. cluster = env.GetClusterID()
  1530. }
  1531. name, err := res.GetString("volumename")
  1532. if err != nil {
  1533. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1534. continue
  1535. }
  1536. key := newPVKey(cluster, name)
  1537. pvMap[key] = &PV{
  1538. Cluster: cluster,
  1539. Name: name,
  1540. CostPerGiBHour: res.Values[0].Value,
  1541. }
  1542. }
  1543. }
  1544. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1545. for _, res := range resPVBytes {
  1546. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1547. if err != nil {
  1548. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1549. continue
  1550. }
  1551. if _, ok := pvMap[key]; !ok {
  1552. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1553. continue
  1554. }
  1555. pvMap[key].Bytes = res.Values[0].Value
  1556. }
  1557. }
  1558. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1559. for _, res := range resPVCInfo {
  1560. cluster, err := res.GetString(env.GetPromClusterLabel())
  1561. if err != nil {
  1562. cluster = env.GetClusterID()
  1563. }
  1564. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1565. if err != nil {
  1566. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1567. continue
  1568. }
  1569. namespace := values["namespace"]
  1570. name := values["persistentvolumeclaim"]
  1571. volume := values["volumename"]
  1572. storageClass := values["storageclass"]
  1573. pvKey := newPVKey(cluster, volume)
  1574. pvcKey := newPVCKey(cluster, namespace, name)
  1575. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1576. // the PVC was running, respectively. We subtract 1m from pvcStart
  1577. // because this point will actually represent the end of the first
  1578. // minute. We don't subtract from pvcEnd because it already represents
  1579. // the end of the last minute.
  1580. var pvcStart, pvcEnd time.Time
  1581. for _, datum := range res.Values {
  1582. t := time.Unix(int64(datum.Timestamp), 0)
  1583. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1584. pvcStart = t
  1585. }
  1586. if datum.Value > 0 && window.Contains(t) {
  1587. pvcEnd = t
  1588. }
  1589. }
  1590. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1591. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1592. }
  1593. pvcStart = pvcStart.Add(-time.Minute)
  1594. if _, ok := pvMap[pvKey]; !ok {
  1595. continue
  1596. }
  1597. pvMap[pvKey].StorageClass = storageClass
  1598. if _, ok := pvcMap[pvcKey]; !ok {
  1599. pvcMap[pvcKey] = &PVC{}
  1600. }
  1601. pvcMap[pvcKey].Name = name
  1602. pvcMap[pvcKey].Namespace = namespace
  1603. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1604. pvcMap[pvcKey].Start = pvcStart
  1605. pvcMap[pvcKey].End = pvcEnd
  1606. }
  1607. }
  1608. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1609. for _, res := range resPVCBytesRequested {
  1610. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1611. if err != nil {
  1612. continue
  1613. }
  1614. if _, ok := pvcMap[key]; !ok {
  1615. continue
  1616. }
  1617. pvcMap[key].Bytes = res.Values[0].Value
  1618. }
  1619. }
  1620. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1621. for _, res := range resPodPVCAllocation {
  1622. cluster, err := res.GetString(env.GetPromClusterLabel())
  1623. if err != nil {
  1624. cluster = env.GetClusterID()
  1625. }
  1626. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1627. if err != nil {
  1628. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1629. continue
  1630. }
  1631. namespace := values["namespace"]
  1632. pod := values["pod"]
  1633. name := values["persistentvolumeclaim"]
  1634. volume := values["persistentvolume"]
  1635. podKey := newPodKey(cluster, namespace, pod)
  1636. pvKey := newPVKey(cluster, volume)
  1637. pvcKey := newPVCKey(cluster, namespace, name)
  1638. if _, ok := pvMap[pvKey]; !ok {
  1639. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1640. continue
  1641. }
  1642. if _, ok := podPVCMap[podKey]; !ok {
  1643. podPVCMap[podKey] = []*PVC{}
  1644. }
  1645. pvc, ok := pvcMap[pvcKey]
  1646. if !ok {
  1647. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1648. continue
  1649. }
  1650. count := 1
  1651. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1652. count = len(pod.Allocations)
  1653. } else {
  1654. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1655. }
  1656. pvc.Count = count
  1657. pvc.Mounted = true
  1658. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1659. }
  1660. }
  1661. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1662. unmountedPVBytes := map[string]float64{}
  1663. unmountedPVCost := map[string]float64{}
  1664. for _, pv := range pvMap {
  1665. mounted := false
  1666. for _, pvc := range pvcMap {
  1667. if pvc.Volume == nil {
  1668. continue
  1669. }
  1670. if pvc.Volume == pv {
  1671. mounted = true
  1672. break
  1673. }
  1674. }
  1675. if !mounted {
  1676. gib := pv.Bytes / 1024 / 1024 / 1024
  1677. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1678. cost := pv.CostPerGiBHour * gib * hrs
  1679. unmountedPVCost[pv.Cluster] += cost
  1680. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1681. }
  1682. }
  1683. for cluster, amount := range unmountedPVCost {
  1684. container := kubecost.UnmountedSuffix
  1685. pod := kubecost.UnmountedSuffix
  1686. namespace := kubecost.UnmountedSuffix
  1687. node := ""
  1688. key := newPodKey(cluster, namespace, pod)
  1689. podMap[key] = &Pod{
  1690. Window: window.Clone(),
  1691. Start: *window.Start(),
  1692. End: *window.End(),
  1693. Key: key,
  1694. Allocations: map[string]*kubecost.Allocation{},
  1695. }
  1696. podMap[key].AppendContainer(container)
  1697. podMap[key].Allocations[container].Properties.Cluster = cluster
  1698. podMap[key].Allocations[container].Properties.Node = node
  1699. podMap[key].Allocations[container].Properties.Namespace = namespace
  1700. podMap[key].Allocations[container].Properties.Pod = pod
  1701. podMap[key].Allocations[container].Properties.Container = container
  1702. pvKey := kubecost.PVKey{
  1703. Cluster: cluster,
  1704. Name: kubecost.UnmountedSuffix,
  1705. }
  1706. unmountedPVs := kubecost.PVAllocations{
  1707. pvKey: {
  1708. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1709. Cost: amount,
  1710. },
  1711. }
  1712. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1713. }
  1714. }
  1715. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1716. unmountedPVCBytes := map[namespaceKey]float64{}
  1717. unmountedPVCCost := map[namespaceKey]float64{}
  1718. for _, pvc := range pvcMap {
  1719. if !pvc.Mounted && pvc.Volume != nil {
  1720. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1721. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1722. hrs := pvc.Minutes() / 60.0
  1723. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1724. unmountedPVCCost[key] += cost
  1725. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1726. }
  1727. }
  1728. for key, amount := range unmountedPVCCost {
  1729. container := kubecost.UnmountedSuffix
  1730. pod := kubecost.UnmountedSuffix
  1731. namespace := key.Namespace
  1732. node := ""
  1733. cluster := key.Cluster
  1734. podKey := newPodKey(cluster, namespace, pod)
  1735. podMap[podKey] = &Pod{
  1736. Window: window.Clone(),
  1737. Start: *window.Start(),
  1738. End: *window.End(),
  1739. Key: podKey,
  1740. Allocations: map[string]*kubecost.Allocation{},
  1741. }
  1742. podMap[podKey].AppendContainer(container)
  1743. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1744. podMap[podKey].Allocations[container].Properties.Node = node
  1745. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1746. podMap[podKey].Allocations[container].Properties.Pod = pod
  1747. podMap[podKey].Allocations[container].Properties.Container = container
  1748. pvKey := kubecost.PVKey{
  1749. Cluster: cluster,
  1750. Name: kubecost.UnmountedSuffix,
  1751. }
  1752. unmountedPVs := kubecost.PVAllocations{
  1753. pvKey: {
  1754. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1755. Cost: amount,
  1756. },
  1757. }
  1758. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  1759. }
  1760. }
  1761. // LB describes the start and end time of a Load Balancer along with cost
  1762. type LB struct {
  1763. TotalCost float64
  1764. Start time.Time
  1765. End time.Time
  1766. }
  1767. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1768. lbMap := make(map[serviceKey]*LB)
  1769. lbHourlyCosts := make(map[serviceKey]float64)
  1770. for _, res := range resLBCost {
  1771. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1772. if err != nil {
  1773. continue
  1774. }
  1775. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1776. }
  1777. for _, res := range resLBActiveMins {
  1778. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1779. if err != nil || len(res.Values) == 0 {
  1780. continue
  1781. }
  1782. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1783. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1784. continue
  1785. }
  1786. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1787. // subtract resolution from start time to cover full time period
  1788. s = s.Add(-resolution)
  1789. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1790. hours := e.Sub(s).Hours()
  1791. lbMap[serviceKey] = &LB{
  1792. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1793. Start: s,
  1794. End: e,
  1795. }
  1796. }
  1797. return lbMap
  1798. }
  1799. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1800. for sKey, lb := range lbMap {
  1801. totalHours := 0.0
  1802. allocHours := make(map[*kubecost.Allocation]float64)
  1803. // Add portion of load balancing cost to each allocation
  1804. // proportional to the total number of hours allocations used the load balancer
  1805. for _, alloc := range allocsByService[sKey] {
  1806. // Determine the (start, end) of the relationship between the
  1807. // given LB and the associated Allocation so that a precise
  1808. // number of hours can be used to compute cumulative cost.
  1809. s, e := alloc.Start, alloc.End
  1810. if lb.Start.After(alloc.Start) {
  1811. s = lb.Start
  1812. }
  1813. if lb.End.Before(alloc.End) {
  1814. e = lb.End
  1815. }
  1816. hours := e.Sub(s).Hours()
  1817. // A negative number of hours signifies no overlap between the windows
  1818. if hours > 0 {
  1819. totalHours += hours
  1820. allocHours[alloc] = hours
  1821. }
  1822. }
  1823. // Distribute cost of service once total hours is calculated
  1824. for alloc, hours := range allocHours {
  1825. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1826. }
  1827. }
  1828. }
  1829. // getNodePricing determines node pricing, given a key and a mapping from keys
  1830. // to their NodePricing instances, as well as the custom pricing configuration
  1831. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1832. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1833. // back on custom pricing as a default.
  1834. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1835. // Find the relevant NodePricing, if it exists. If not, substitute the
  1836. // custom NodePricing as a default.
  1837. node, ok := nodeMap[nodeKey]
  1838. if !ok || node == nil {
  1839. if nodeKey.Node != "" {
  1840. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1841. }
  1842. return cm.getCustomNodePricing(false)
  1843. }
  1844. // If custom pricing is enabled and can be retrieved, override detected
  1845. // node pricing with the custom values.
  1846. customPricingConfig, err := cm.Provider.GetConfig()
  1847. if err != nil {
  1848. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1849. }
  1850. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1851. return cm.getCustomNodePricing(node.Preemptible)
  1852. }
  1853. node.Source = "prometheus"
  1854. // If any of the values are NaN or zero, replace them with the custom
  1855. // values as default.
  1856. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1857. // them as strings like this?
  1858. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1859. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1860. cpuCostStr := customPricingConfig.CPU
  1861. if node.Preemptible {
  1862. cpuCostStr = customPricingConfig.SpotCPU
  1863. }
  1864. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1865. if err != nil {
  1866. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1867. }
  1868. node.CostPerCPUHr = costPerCPUHr
  1869. node.Source += "/customCPU"
  1870. }
  1871. if math.IsNaN(node.CostPerGPUHr) {
  1872. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1873. gpuCostStr := customPricingConfig.GPU
  1874. if node.Preemptible {
  1875. gpuCostStr = customPricingConfig.SpotGPU
  1876. }
  1877. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1878. if err != nil {
  1879. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1880. }
  1881. node.CostPerGPUHr = costPerGPUHr
  1882. node.Source += "/customGPU"
  1883. }
  1884. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1885. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1886. ramCostStr := customPricingConfig.RAM
  1887. if node.Preemptible {
  1888. ramCostStr = customPricingConfig.SpotRAM
  1889. }
  1890. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1891. if err != nil {
  1892. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1893. }
  1894. node.CostPerRAMGiBHr = costPerRAMHr
  1895. node.Source += "/customRAM"
  1896. }
  1897. return node
  1898. }
  1899. // getCustomNodePricing converts the CostModel's configured custom pricing
  1900. // values into a NodePricing instance.
  1901. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1902. customPricingConfig, err := cm.Provider.GetConfig()
  1903. if err != nil {
  1904. return nil
  1905. }
  1906. cpuCostStr := customPricingConfig.CPU
  1907. gpuCostStr := customPricingConfig.GPU
  1908. ramCostStr := customPricingConfig.RAM
  1909. if spot {
  1910. cpuCostStr = customPricingConfig.SpotCPU
  1911. gpuCostStr = customPricingConfig.SpotGPU
  1912. ramCostStr = customPricingConfig.SpotRAM
  1913. }
  1914. node := &NodePricing{Source: "custom"}
  1915. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1916. if err != nil {
  1917. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1918. }
  1919. node.CostPerCPUHr = costPerCPUHr
  1920. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1921. if err != nil {
  1922. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1923. }
  1924. node.CostPerGPUHr = costPerGPUHr
  1925. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1926. if err != nil {
  1927. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1928. }
  1929. node.CostPerRAMGiBHr = costPerRAMHr
  1930. return node
  1931. }
  1932. // NodePricing describes the resource costs associated with a given node, as
  1933. // well as the source of the information (e.g. prometheus, custom)
  1934. type NodePricing struct {
  1935. Name string
  1936. NodeType string
  1937. ProviderID string
  1938. Preemptible bool
  1939. CostPerCPUHr float64
  1940. CostPerRAMGiBHr float64
  1941. CostPerGPUHr float64
  1942. Discount float64
  1943. Source string
  1944. }
  1945. // Pod describes a running pod's start and end time within a Window and
  1946. // all the Allocations (i.e. containers) contained within it.
  1947. type Pod struct {
  1948. Window kubecost.Window
  1949. Start time.Time
  1950. End time.Time
  1951. Key podKey
  1952. Allocations map[string]*kubecost.Allocation
  1953. }
  1954. // AppendContainer adds an entry for the given container name to the Pod.
  1955. func (p Pod) AppendContainer(container string) {
  1956. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1957. alloc := &kubecost.Allocation{
  1958. Name: name,
  1959. Properties: &kubecost.AllocationProperties{},
  1960. Window: p.Window.Clone(),
  1961. Start: p.Start,
  1962. End: p.End,
  1963. }
  1964. alloc.Properties.Container = container
  1965. alloc.Properties.Pod = p.Key.Pod
  1966. alloc.Properties.Namespace = p.Key.Namespace
  1967. alloc.Properties.Cluster = p.Key.Cluster
  1968. p.Allocations[container] = alloc
  1969. }
  1970. // PVC describes a PersistentVolumeClaim
  1971. // TODO:CLEANUP move to pkg/kubecost?
  1972. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1973. type PVC struct {
  1974. Bytes float64 `json:"bytes"`
  1975. Count int `json:"count"`
  1976. Name string `json:"name"`
  1977. Cluster string `json:"cluster"`
  1978. Namespace string `json:"namespace"`
  1979. Volume *PV `json:"persistentVolume"`
  1980. Mounted bool `json:"mounted"`
  1981. Start time.Time `json:"start"`
  1982. End time.Time `json:"end"`
  1983. }
  1984. // Cost computes the cumulative cost of the PVC
  1985. func (pvc *PVC) Cost() float64 {
  1986. if pvc == nil || pvc.Volume == nil {
  1987. return 0.0
  1988. }
  1989. gib := pvc.Bytes / 1024 / 1024 / 1024
  1990. hrs := pvc.Minutes() / 60.0
  1991. return pvc.Volume.CostPerGiBHour * gib * hrs
  1992. }
  1993. // Minutes computes the number of minutes over which the PVC is defined
  1994. func (pvc *PVC) Minutes() float64 {
  1995. if pvc == nil {
  1996. return 0.0
  1997. }
  1998. return pvc.End.Sub(pvc.Start).Minutes()
  1999. }
  2000. // String returns a string representation of the PVC
  2001. func (pvc *PVC) String() string {
  2002. if pvc == nil {
  2003. return "<nil>"
  2004. }
  2005. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  2006. }
  2007. // PV describes a PersistentVolume
  2008. // TODO:CLEANUP move to pkg/kubecost?
  2009. type PV struct {
  2010. Bytes float64 `json:"bytes"`
  2011. CostPerGiBHour float64 `json:"costPerGiBHour"`
  2012. Cluster string `json:"cluster"`
  2013. Name string `json:"name"`
  2014. StorageClass string `json:"storageClass"`
  2015. }
  2016. // String returns a string representation of the PV
  2017. func (pv *PV) String() string {
  2018. if pv == nil {
  2019. return "<nil>"
  2020. }
  2021. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  2022. }