allocation.go 93 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/util/timeutil"
  9. "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/env"
  11. "github.com/opencost/opencost/pkg/kubecost"
  12. "github.com/opencost/opencost/pkg/log"
  13. "github.com/opencost/opencost/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
  18. queryFmtPodsUID = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
  19. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  20. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  21. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  23. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  24. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  25. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  27. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  28. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  29. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  32. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s])`
  33. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  34. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
  35. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  36. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
  37. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
  38. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  39. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
  40. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  41. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
  42. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  43. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
  44. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  45. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  46. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s])`
  47. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s])`
  48. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s])`
  49. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s])`
  50. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s])`
  51. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s])`
  52. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s])`
  53. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
  54. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
  55. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
  56. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
  57. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
  58. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
  59. )
  60. // This is a bit of a hack to work around garbage data from cadvisor
  61. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  62. const MAX_CPU_CAP = 512
  63. // CanCompute should return true if CostModel can act as a valid source for the
  64. // given time range. In the case of CostModel we want to attempt to compute as
  65. // long as the range starts in the past. If the CostModel ends up not having
  66. // data to match, that's okay, and should be communicated with an error
  67. // response from ComputeAllocation.
  68. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  69. return start.Before(time.Now())
  70. }
  71. // Name returns the name of the Source
  72. func (cm *CostModel) Name() string {
  73. return "CostModel"
  74. }
  75. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  76. // for the window defined by the given start and end times. The Allocations
  77. // returned are unaggregated (i.e. down to the container level).
  78. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  79. // If the duration is short enough, compute the AllocationSet directly
  80. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  81. return cm.computeAllocation(start, end, resolution)
  82. }
  83. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  84. // query for maximum-sized AllocationSets, collect them, and accumulate.
  85. // s and e track the coverage of the entire given window over multiple
  86. // internal queries.
  87. s, e := start, start
  88. // Collect AllocationSets in a range, then accumulate
  89. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  90. asr := kubecost.NewAllocationSetRange()
  91. for e.Before(end) {
  92. // By default, query for the full remaining duration. But do not let
  93. // any individual query duration exceed the configured max Prometheus
  94. // query duration.
  95. duration := end.Sub(e)
  96. if duration > cm.MaxPrometheusQueryDuration {
  97. duration = cm.MaxPrometheusQueryDuration
  98. }
  99. // Set start and end parameters (s, e) for next individual computation.
  100. e = s.Add(duration)
  101. // Compute the individual AllocationSet for just (s, e)
  102. as, err := cm.computeAllocation(s, e, resolution)
  103. if err != nil {
  104. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
  105. }
  106. // Append to the range
  107. asr.Append(as)
  108. // Set s equal to e to set up the next query, if one exists.
  109. s = e
  110. }
  111. // Populate annotations, labels, and services on each Allocation. This is
  112. // necessary because Properties.Intersection does not propagate any values
  113. // stored in maps or slices for performance reasons. In this case, however,
  114. // it is both acceptable and necessary to do so.
  115. allocationAnnotations := map[string]map[string]string{}
  116. allocationLabels := map[string]map[string]string{}
  117. allocationServices := map[string]map[string]bool{}
  118. // Also record errors and warnings, then append them to the results later.
  119. errors := []string{}
  120. warnings := []string{}
  121. for _, as := range asr.Allocations {
  122. for k, a := range as.Allocations {
  123. if len(a.Properties.Annotations) > 0 {
  124. if _, ok := allocationAnnotations[k]; !ok {
  125. allocationAnnotations[k] = map[string]string{}
  126. }
  127. for name, val := range a.Properties.Annotations {
  128. allocationAnnotations[k][name] = val
  129. }
  130. }
  131. if len(a.Properties.Labels) > 0 {
  132. if _, ok := allocationLabels[k]; !ok {
  133. allocationLabels[k] = map[string]string{}
  134. }
  135. for name, val := range a.Properties.Labels {
  136. allocationLabels[k][name] = val
  137. }
  138. }
  139. if len(a.Properties.Services) > 0 {
  140. if _, ok := allocationServices[k]; !ok {
  141. allocationServices[k] = map[string]bool{}
  142. }
  143. for _, val := range a.Properties.Services {
  144. allocationServices[k][val] = true
  145. }
  146. }
  147. }
  148. errors = append(errors, as.Errors...)
  149. warnings = append(warnings, as.Warnings...)
  150. }
  151. // Accumulate to yield the result AllocationSet. After this step, we will
  152. // be nearly complete, but without the raw allocation data, which must be
  153. // recomputed.
  154. result, err := asr.Accumulate()
  155. if err != nil {
  156. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
  157. }
  158. // Apply the annotations, labels, and services to the post-accumulation
  159. // results. (See above for why this is necessary.)
  160. for k, a := range result.Allocations {
  161. if annotations, ok := allocationAnnotations[k]; ok {
  162. a.Properties.Annotations = annotations
  163. }
  164. if labels, ok := allocationLabels[k]; ok {
  165. a.Properties.Labels = labels
  166. }
  167. if services, ok := allocationServices[k]; ok {
  168. a.Properties.Services = []string{}
  169. for s := range services {
  170. a.Properties.Services = append(a.Properties.Services, s)
  171. }
  172. }
  173. // Expand the Window of all Allocations within the AllocationSet
  174. // to match the Window of the AllocationSet, which gets expanded
  175. // at the end of this function.
  176. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  177. }
  178. // Maintain RAM and CPU max usage values by iterating over the range,
  179. // computing maximums on a rolling basis, and setting on the result set.
  180. for _, as := range asr.Allocations {
  181. for key, alloc := range as.Allocations {
  182. resultAlloc := result.Get(key)
  183. if resultAlloc == nil {
  184. continue
  185. }
  186. if resultAlloc.RawAllocationOnly == nil {
  187. resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
  188. }
  189. if alloc.RawAllocationOnly == nil {
  190. // This will happen inevitably for unmounted disks, but should
  191. // ideally not happen for any allocation with CPU and RAM data.
  192. if !alloc.IsUnmounted() {
  193. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  194. }
  195. continue
  196. }
  197. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  198. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  199. }
  200. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  201. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  202. }
  203. }
  204. }
  205. // Expand the window to match the queried time range.
  206. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  207. // Append errors and warnings
  208. result.Errors = errors
  209. result.Warnings = warnings
  210. return result, nil
  211. }
  212. func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  213. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  214. // 2. Run and apply the results of the remaining queries to
  215. // 3. Build out AllocationSet from completed Pod map
  216. // Create a window spanning the requested query
  217. window := kubecost.NewWindow(&start, &end)
  218. // Create an empty AllocationSet. For safety, in the case of an error, we
  219. // should prefer to return this empty set with the error. (In the case of
  220. // no error, of course we populate the set and return it.)
  221. allocSet := kubecost.NewAllocationSet(start, end)
  222. // (1) Build out Pod map
  223. // Build out a map of Allocations as a mapping from pod-to-container-to-
  224. // underlying-Allocation instance, starting with (start, end) so that we
  225. // begin with minutes, from which we compute resource allocation and cost
  226. // totals from measured rate data.
  227. podMap := map[podKey]*Pod{}
  228. // clusterStarts and clusterEnds record the earliest start and latest end
  229. // times, respectively, on a cluster-basis. These are used for unmounted
  230. // PVs and other "virtual" Allocations so that minutes are maximally
  231. // accurate during start-up or spin-down of a cluster
  232. clusterStart := map[string]time.Time{}
  233. clusterEnd := map[string]time.Time{}
  234. // If ingesting pod UID, we query kube_pod_container_status_running avg
  235. // by uid as well as the default values, and all podKeys/pods have their
  236. // names changed to "<pod_name> <pod_uid>". Because other metrics need
  237. // to generate keys to match pods but don't have UIDs, podUIDKeyMap
  238. // stores values of format:
  239. // default podKey : []{edited podkey 1, edited podkey 2}
  240. // This is because ingesting UID allows us to catch uncontrolled pods
  241. // with the same names. However, this will lead to a many-to-one metric
  242. // to podKey relation, so this map allows us to map the metric's
  243. // "<pod_name>" key to the edited "<pod_name> <pod_uid>" keys in podMap.
  244. ingestPodUID := env.IsIngestingPodUID()
  245. podUIDKeyMap := make(map[podKey][]podKey)
  246. if ingestPodUID {
  247. log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
  248. }
  249. // TODO:CLEANUP remove "max batch" idea and clusterStart/End
  250. err := cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
  251. if err != nil {
  252. log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
  253. }
  254. // (2) Run and apply remaining queries
  255. // Query for the duration between start and end
  256. durStr := timeutil.DurationString(end.Sub(start))
  257. if durStr == "" {
  258. return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  259. }
  260. // Convert resolution duration to a query-ready string
  261. resStr := timeutil.DurationString(resolution)
  262. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  263. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
  264. resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
  265. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
  266. resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
  267. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
  268. resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
  269. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
  270. resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
  271. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
  272. resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
  273. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
  274. resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
  275. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
  276. resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
  277. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
  278. resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
  279. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
  280. resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
  281. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
  282. resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
  283. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
  284. resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  285. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
  286. resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  287. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
  288. resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  289. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
  290. resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
  291. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
  292. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
  293. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
  294. resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
  295. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
  296. resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
  297. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
  298. resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
  299. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
  300. resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  301. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
  302. resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
  303. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
  304. resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
  305. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
  306. resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
  307. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
  308. resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  309. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
  310. resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
  311. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
  312. resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  313. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
  314. resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
  315. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
  316. resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  317. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
  318. resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
  319. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
  320. resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
  321. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
  322. resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
  323. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
  324. resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
  325. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
  326. resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
  327. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
  328. resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
  329. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
  330. resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
  331. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
  332. resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
  333. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
  334. resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  335. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
  336. resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  337. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
  338. resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
  339. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
  340. resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
  341. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
  342. resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
  343. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  344. resCPURequests, _ := resChCPURequests.Await()
  345. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  346. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  347. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  348. resRAMRequests, _ := resChRAMRequests.Await()
  349. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  350. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  351. resGPUsRequested, _ := resChGPUsRequested.Await()
  352. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  353. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  354. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  355. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  356. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  357. resPVBytes, _ := resChPVBytes.Await()
  358. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  359. resPVCInfo, _ := resChPVCInfo.Await()
  360. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  361. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  362. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  363. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  364. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  365. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  366. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  367. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  368. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  369. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  370. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  371. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  372. resPodLabels, _ := resChPodLabels.Await()
  373. resPodAnnotations, _ := resChPodAnnotations.Await()
  374. resServiceLabels, _ := resChServiceLabels.Await()
  375. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  376. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  377. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  378. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  379. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  380. resJobLabels, _ := resChJobLabels.Await()
  381. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  382. resLBActiveMins, _ := resChLBActiveMins.Await()
  383. if ctx.HasErrors() {
  384. for _, err := range ctx.Errors() {
  385. log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
  386. }
  387. return allocSet, ctx.ErrorCollection()
  388. }
  389. // We choose to apply allocation before requests in the cases of RAM and
  390. // CPU so that we can assert that allocation should always be greater than
  391. // or equal to request.
  392. applyCPUCoresAllocated(podMap, resCPUCoresAllocated, podUIDKeyMap)
  393. applyCPUCoresRequested(podMap, resCPURequests, podUIDKeyMap)
  394. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg, podUIDKeyMap)
  395. applyCPUCoresUsedMax(podMap, resCPUUsageMax, podUIDKeyMap)
  396. applyRAMBytesAllocated(podMap, resRAMBytesAllocated, podUIDKeyMap)
  397. applyRAMBytesRequested(podMap, resRAMRequests, podUIDKeyMap)
  398. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg, podUIDKeyMap)
  399. applyRAMBytesUsedMax(podMap, resRAMUsageMax, podUIDKeyMap)
  400. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated, podUIDKeyMap)
  401. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes, podUIDKeyMap)
  402. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB, podUIDKeyMap)
  403. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB, podUIDKeyMap)
  404. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB, podUIDKeyMap)
  405. // In the case that a two pods with the same name had different containers,
  406. // we will double-count the containers. There is no way to associate each
  407. // container with the proper pod from the usage metrics above. This will
  408. // show up as a pod having two Allocations running for the whole pod runtime.
  409. // Other than that case, Allocations should be associated with pods by the
  410. // above functions.
  411. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  412. podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
  413. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  414. podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
  415. applyLabels(podMap, namespaceLabels, podLabels)
  416. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  417. serviceLabels := getServiceLabels(resServiceLabels)
  418. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  419. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  420. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  421. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  422. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
  423. podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
  424. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, podUIDKeyMap, ingestPodUID)
  425. applyControllersToPods(podMap, podDeploymentMap)
  426. applyControllersToPods(podMap, podStatefulSetMap)
  427. applyControllersToPods(podMap, podDaemonSetMap)
  428. applyControllersToPods(podMap, podJobMap)
  429. applyControllersToPods(podMap, podReplicaSetMap)
  430. // TODO breakdown network costs?
  431. // Build out a map of Nodes with resource costs, discounts, and node types
  432. // for converting resource allocation data to cumulative costs.
  433. nodeMap := map[nodeKey]*NodePricing{}
  434. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  435. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  436. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  437. applyNodeSpot(nodeMap, resNodeIsSpot)
  438. applyNodeDiscount(nodeMap, cm)
  439. // Build out the map of all PVs with class, size and cost-per-hour.
  440. // Note: this does not record time running, which we may want to
  441. // include later for increased PV precision. (As long as the PV has
  442. // a PVC, we get time running there, so this is only inaccurate
  443. // for short-lived, unmounted PVs.)
  444. pvMap := map[pvKey]*PV{}
  445. buildPVMap(pvMap, resPVCostPerGiBHour)
  446. applyPVBytes(pvMap, resPVBytes)
  447. // Build out the map of all PVCs with time running, bytes requested,
  448. // and connect to the correct PV from pvMap. (If no PV exists, that
  449. // is noted, but does not result in any allocation/cost.)
  450. pvcMap := map[pvcKey]*PVC{}
  451. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  452. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  453. // Build out the relationships of pods to their PVCs. This step
  454. // populates the PVC.Count field so that PVC allocation can be
  455. // split appropriately among each pod's container allocation.
  456. podPVCMap := map[podKey][]*PVC{}
  457. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
  458. // Because PVCs can be shared among pods, the respective PV cost
  459. // needs to be evenly distributed to those pods based on time
  460. // running, as well as the amount of time the PVC was shared.
  461. // Build a relation between every PVC to the pods that mount it
  462. // and a window representing the interval during which they
  463. // were associated.
  464. pvcPodIntervalMap := make(map[pvcKey]map[podKey]kubecost.Window)
  465. for _, pod := range podMap {
  466. for _, alloc := range pod.Allocations {
  467. cluster := alloc.Properties.Cluster
  468. namespace := alloc.Properties.Namespace
  469. pod := alloc.Properties.Pod
  470. thisPodKey := newPodKey(cluster, namespace, pod)
  471. if pvcs, ok := podPVCMap[thisPodKey]; ok {
  472. for _, pvc := range pvcs {
  473. // Determine the (start, end) of the relationship between the
  474. // given PVC and the associated Allocation so that a precise
  475. // number of hours can be used to compute cumulative cost.
  476. s, e := alloc.Start, alloc.End
  477. if pvc.Start.After(alloc.Start) {
  478. s = pvc.Start
  479. }
  480. if pvc.End.Before(alloc.End) {
  481. e = pvc.End
  482. }
  483. thisPVCKey := newPVCKey(cluster, namespace, pvc.Name)
  484. if pvcPodIntervalMap[thisPVCKey] == nil {
  485. pvcPodIntervalMap[thisPVCKey] = make(map[podKey]kubecost.Window)
  486. }
  487. pvcPodIntervalMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
  488. }
  489. }
  490. // We only need to look at one alloc per pod
  491. break
  492. }
  493. }
  494. // Build out a PV price coefficient for each pod with a PVC. Each
  495. // PVC-pod relation needs a coefficient which modifies the PV cost
  496. // such that PV costs can be shared between all pods using that PVC.
  497. sharedPVCCostCoefficientMap := make(map[pvcKey]map[podKey][]CoefficientComponent)
  498. for pvcKey, podIntervalMap := range pvcPodIntervalMap {
  499. // Get single-point intervals from alloc-PVC relation windows.
  500. intervals := getIntervalPointsFromWindows(podIntervalMap)
  501. // Determine coefficients for each PVC-pod relation.
  502. sharedPVCCostCoefficientMap[pvcKey] = getPVCCostCoefficients(intervals, podIntervalMap)
  503. }
  504. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  505. // cluster representing each cluster's unmounted PVs (if necessary).
  506. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  507. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  508. applyLoadBalancersToPods(lbMap, allocsByService)
  509. // (3) Build out AllocationSet from Pod map
  510. for _, pod := range podMap {
  511. for _, alloc := range pod.Allocations {
  512. cluster := alloc.Properties.Cluster
  513. nodeName := alloc.Properties.Node
  514. namespace := alloc.Properties.Namespace
  515. pod := alloc.Properties.Pod
  516. container := alloc.Properties.Container
  517. podKey := newPodKey(cluster, namespace, pod)
  518. nodeKey := newNodeKey(cluster, nodeName)
  519. node := cm.getNodePricing(nodeMap, nodeKey)
  520. alloc.Properties.ProviderID = node.ProviderID
  521. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  522. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  523. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  524. if pvcs, ok := podPVCMap[podKey]; ok {
  525. for _, pvc := range pvcs {
  526. pvcKey := newPVCKey(cluster, namespace, pvc.Name)
  527. s, e := alloc.Start, alloc.End
  528. if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
  529. s, e = *pvcInterval.Start(), *pvcInterval.End()
  530. } else {
  531. log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
  532. }
  533. minutes := e.Sub(s).Minutes()
  534. hrs := minutes / 60.0
  535. count := float64(pvc.Count)
  536. if pvc.Count < 1 {
  537. count = 1
  538. }
  539. gib := pvc.Bytes / 1024 / 1024 / 1024
  540. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  541. // Scale PV cost by PVC sharing coefficient.
  542. if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
  543. cost *= getCoefficientFromComponents(coeffComponents)
  544. } else {
  545. log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
  546. }
  547. // Apply the size and cost of the PV to the allocation, each
  548. // weighted by count (i.e. the number of containers in the pod)
  549. // record the amount of total PVBytes Hours attributable to a given PV
  550. if alloc.PVs == nil {
  551. alloc.PVs = kubecost.PVAllocations{}
  552. }
  553. pvKey := kubecost.PVKey{
  554. Cluster: pvc.Volume.Cluster,
  555. Name: pvc.Volume.Name,
  556. }
  557. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  558. ByteHours: pvc.Bytes * hrs / count,
  559. Cost: cost / count,
  560. }
  561. }
  562. }
  563. // Make sure that the name is correct (node may not be present at this
  564. // point due to it missing from queryMinutes) then insert.
  565. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  566. allocSet.Set(alloc)
  567. }
  568. }
  569. return allocSet, nil
  570. }
  571. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
  572. // Assumes that window is positive and closed
  573. start, end := *window.Start(), *window.End()
  574. // Convert resolution duration to a query-ready string
  575. resStr := timeutil.DurationString(resolution)
  576. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  577. // Query for (start, end) by (pod, namespace, cluster) over the given
  578. // window, using the given resolution, and if necessary in batches no
  579. // larger than the given maximum batch size. If working in batches, track
  580. // overall progress by starting with (window.start, window.start) and
  581. // querying in batches no larger than maxBatchSize from start-to-end,
  582. // folding each result set into podMap as the results come back.
  583. coverage := kubecost.NewWindow(&start, &start)
  584. numQuery := 1
  585. for coverage.End().Before(end) {
  586. // Determine the (start, end) of the current batch
  587. batchStart := *coverage.End()
  588. batchEnd := coverage.End().Add(maxBatchSize)
  589. if batchEnd.After(end) {
  590. batchEnd = end
  591. }
  592. var resPods []*prom.QueryResult
  593. var err error
  594. maxTries := 3
  595. numTries := 0
  596. for resPods == nil && numTries < maxTries {
  597. numTries++
  598. // Query for the duration between start and end
  599. durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
  600. if durStr == "" {
  601. // Negative duration, so set empty results and don't query
  602. resPods = []*prom.QueryResult{}
  603. err = nil
  604. break
  605. }
  606. // Submit and profile query
  607. var queryPods string
  608. // If ingesting UIDs, avg on them
  609. if ingestPodUID {
  610. queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterLabel(), durStr, resStr)
  611. } else {
  612. queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr)
  613. }
  614. queryProfile := time.Now()
  615. resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
  616. if err != nil {
  617. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  618. resPods = nil
  619. }
  620. }
  621. if err != nil {
  622. return err
  623. }
  624. // queryFmtPodsUID will return both UID-containing results, and non-UID-containing results,
  625. // so filter out the non-containing results so we don't duplicate pods. This is due to the
  626. // default setup of Kubecost having replicated kube_pod_container_status_running and
  627. // included KSM kube_pod_container_status_running. Querying w/ UID will return both.
  628. if ingestPodUID {
  629. var resPodsUID []*prom.QueryResult
  630. for _, res := range resPods {
  631. _, err := res.GetString("uid")
  632. if err == nil {
  633. resPodsUID = append(resPodsUID, res)
  634. }
  635. }
  636. if len(resPodsUID) > 0 {
  637. resPods = resPodsUID
  638. } else {
  639. log.DedupedWarningf(5, "CostModel.ComputeAllocation: UID ingestion enabled, but query did not return any results with UID")
  640. }
  641. }
  642. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
  643. coverage = coverage.ExpandEnd(batchEnd)
  644. numQuery++
  645. }
  646. return nil
  647. }
  648. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
  649. for _, res := range resPods {
  650. if len(res.Values) == 0 {
  651. log.Warnf("CostModel.ComputeAllocation: empty minutes result")
  652. continue
  653. }
  654. cluster, err := res.GetString(env.GetPromClusterLabel())
  655. if err != nil {
  656. cluster = env.GetClusterID()
  657. }
  658. labels, err := res.GetStrings("namespace", "pod")
  659. if err != nil {
  660. log.Warnf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  661. continue
  662. }
  663. namespace := labels["namespace"]
  664. pod := labels["pod"]
  665. key := newPodKey(cluster, namespace, pod)
  666. // If pod UIDs are being used to ID pods, append them to the pod name in
  667. // the podKey.
  668. if ingestPodUID {
  669. uid, err := res.GetString("uid")
  670. if err != nil {
  671. log.Warnf("CostModel.ComputeAllocation: UID ingestion enabled, but query result missing field: %s", err)
  672. } else {
  673. newKey := newPodKey(cluster, namespace, pod+" "+uid)
  674. podUIDKeyMap[key] = append(podUIDKeyMap[key], newKey)
  675. key = newKey
  676. }
  677. }
  678. // allocStart and allocEnd are the timestamps of the first and last
  679. // minutes the pod was running, respectively. We subtract one resolution
  680. // from allocStart because this point will actually represent the end
  681. // of the first minute. We don't subtract from allocEnd because it
  682. // already represents the end of the last minute.
  683. var allocStart, allocEnd time.Time
  684. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  685. for _, datum := range res.Values {
  686. t := time.Unix(int64(datum.Timestamp), 0)
  687. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  688. // Set the start timestamp to the earliest non-zero timestamp
  689. allocStart = t
  690. // Record adjustment coefficient, i.e. the portion of the start
  691. // timestamp to "ignore". That is, sometimes the value will be
  692. // 0.5, meaning that we should discount the time running by
  693. // half of the resolution the timestamp stands for.
  694. startAdjustmentCoeff = (1.0 - datum.Value)
  695. }
  696. if datum.Value > 0 && window.Contains(t) {
  697. // Set the end timestamp to the latest non-zero timestamp
  698. allocEnd = t
  699. // Record adjustment coefficient, i.e. the portion of the end
  700. // timestamp to "ignore". (See explanation above for start.)
  701. endAdjustmentCoeff = (1.0 - datum.Value)
  702. }
  703. }
  704. if allocStart.IsZero() || allocEnd.IsZero() {
  705. continue
  706. }
  707. // Adjust timestamps according to the resolution and the adjustment
  708. // coefficients, as described above. That is, count the start timestamp
  709. // from the beginning of the resolution, not the end. Then "reduce" the
  710. // start and end by the correct amount, in the case that the "running"
  711. // value of the first or last timestamp was not a full 1.0.
  712. allocStart = allocStart.Add(-resolution)
  713. // Note: the *100 and /100 are necessary because Duration is an int, so
  714. // 0.5, for instance, will be truncated, resulting in no adjustment.
  715. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  716. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  717. // Ensure that the allocStart is always within the window, adjusting
  718. // for the occasions where start falls 1m before the query window.
  719. // NOTE: window here will always be closed (so no need to nil check
  720. // "start").
  721. // TODO:CLEANUP revisit query methodology to figure out why this is
  722. // happening on occasion
  723. if allocStart.Before(*window.Start()) {
  724. allocStart = *window.Start()
  725. }
  726. // If there is only one point with a value <= 0.5 that the start and
  727. // end timestamps both share, then we will enter this case because at
  728. // least half of a resolution will be subtracted from both the start
  729. // and the end. If that is the case, then add back half of each side
  730. // so that the pod is said to run for half a resolution total.
  731. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  732. // end up with allocEnd == allocStart and each coeff == 0.5. In
  733. // that case, add 0.25m to each side, resulting in 0.5m duration.
  734. if !allocEnd.After(allocStart) {
  735. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  736. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  737. }
  738. // Ensure that the allocEnf is always within the window, adjusting
  739. // for the occasions where end falls 1m after the query window. This
  740. // has not ever happened, but is symmetrical with the start check
  741. // above.
  742. // NOTE: window here will always be closed (so no need to nil check
  743. // "end").
  744. // TODO:CLEANUP revisit query methodology to figure out why this is
  745. // happening on occasion
  746. if allocEnd.After(*window.End()) {
  747. allocEnd = *window.End()
  748. }
  749. // Set start if unset or this datum's start time is earlier than the
  750. // current earliest time.
  751. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  752. clusterStart[cluster] = allocStart
  753. }
  754. // Set end if unset or this datum's end time is later than the
  755. // current latest time.
  756. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  757. clusterEnd[cluster] = allocEnd
  758. }
  759. if pod, ok := podMap[key]; ok {
  760. // Pod has already been recorded, so update it accordingly
  761. if allocStart.Before(pod.Start) {
  762. pod.Start = allocStart
  763. }
  764. if allocEnd.After(pod.End) {
  765. pod.End = allocEnd
  766. }
  767. } else {
  768. // Pod has not been recorded yet, so insert it
  769. podMap[key] = &Pod{
  770. Window: window.Clone(),
  771. Start: allocStart,
  772. End: allocEnd,
  773. Key: key,
  774. Allocations: map[string]*kubecost.Allocation{},
  775. }
  776. }
  777. }
  778. }
  779. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  780. for _, res := range resCPUCoresAllocated {
  781. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  782. if err != nil {
  783. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  784. continue
  785. }
  786. container, err := res.GetString("container")
  787. if err != nil {
  788. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  789. continue
  790. }
  791. var pods []*Pod
  792. pod, ok := podMap[key]
  793. if !ok {
  794. if uidKeys, ok := podUIDKeyMap[key]; ok {
  795. for _, uidKey := range uidKeys {
  796. pod, ok = podMap[uidKey]
  797. if ok {
  798. pods = append(pods, pod)
  799. }
  800. }
  801. } else {
  802. continue
  803. }
  804. } else {
  805. pods = []*Pod{pod}
  806. }
  807. for _, pod := range pods {
  808. if _, ok := pod.Allocations[container]; !ok {
  809. pod.AppendContainer(container)
  810. }
  811. cpuCores := res.Values[0].Value
  812. if cpuCores > MAX_CPU_CAP {
  813. log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  814. cpuCores = 0.0
  815. }
  816. hours := pod.Allocations[container].Minutes() / 60.0
  817. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  818. node, err := res.GetString("node")
  819. if err != nil {
  820. log.Warnf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  821. continue
  822. }
  823. pod.Allocations[container].Properties.Node = node
  824. }
  825. }
  826. }
  827. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  828. for _, res := range resCPUCoresRequested {
  829. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  830. if err != nil {
  831. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  832. continue
  833. }
  834. container, err := res.GetString("container")
  835. if err != nil {
  836. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  837. continue
  838. }
  839. var pods []*Pod
  840. pod, ok := podMap[key]
  841. if !ok {
  842. if uidKeys, ok := podUIDKeyMap[key]; ok {
  843. for _, uidKey := range uidKeys {
  844. pod, ok = podMap[uidKey]
  845. if ok {
  846. pods = append(pods, pod)
  847. }
  848. }
  849. } else {
  850. continue
  851. }
  852. } else {
  853. pods = []*Pod{pod}
  854. }
  855. for _, pod := range pods {
  856. if _, ok := pod.Allocations[container]; !ok {
  857. pod.AppendContainer(container)
  858. }
  859. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  860. // If CPU allocation is less than requests, set CPUCoreHours to
  861. // request level.
  862. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  863. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  864. }
  865. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  866. log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  867. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  868. }
  869. node, err := res.GetString("node")
  870. if err != nil {
  871. log.Warnf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  872. continue
  873. }
  874. pod.Allocations[container].Properties.Node = node
  875. }
  876. }
  877. }
  878. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  879. for _, res := range resCPUCoresUsedAvg {
  880. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  881. if err != nil {
  882. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  883. continue
  884. }
  885. container, err := res.GetString("container")
  886. if container == "" || err != nil {
  887. container, err = res.GetString("container_name")
  888. if err != nil {
  889. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  890. continue
  891. }
  892. }
  893. var pods []*Pod
  894. pod, ok := podMap[key]
  895. if !ok {
  896. if uidKeys, ok := podUIDKeyMap[key]; ok {
  897. for _, uidKey := range uidKeys {
  898. pod, ok = podMap[uidKey]
  899. if ok {
  900. pods = append(pods, pod)
  901. }
  902. }
  903. } else {
  904. continue
  905. }
  906. } else {
  907. pods = []*Pod{pod}
  908. }
  909. for _, pod := range pods {
  910. if _, ok := pod.Allocations[container]; !ok {
  911. pod.AppendContainer(container)
  912. }
  913. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  914. if res.Values[0].Value > MAX_CPU_CAP {
  915. log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  916. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  917. }
  918. }
  919. }
  920. }
  921. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  922. for _, res := range resCPUCoresUsedMax {
  923. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  924. if err != nil {
  925. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  926. continue
  927. }
  928. container, err := res.GetString("container")
  929. if container == "" || err != nil {
  930. container, err = res.GetString("container_name")
  931. if err != nil {
  932. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  933. continue
  934. }
  935. }
  936. var pods []*Pod
  937. pod, ok := podMap[key]
  938. if !ok {
  939. if uidKeys, ok := podUIDKeyMap[key]; ok {
  940. for _, uidKey := range uidKeys {
  941. pod, ok = podMap[uidKey]
  942. if ok {
  943. pods = append(pods, pod)
  944. }
  945. }
  946. } else {
  947. continue
  948. }
  949. } else {
  950. pods = []*Pod{pod}
  951. }
  952. for _, pod := range pods {
  953. if _, ok := pod.Allocations[container]; !ok {
  954. pod.AppendContainer(container)
  955. }
  956. if pod.Allocations[container].RawAllocationOnly == nil {
  957. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  958. CPUCoreUsageMax: res.Values[0].Value,
  959. }
  960. } else {
  961. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  962. }
  963. }
  964. }
  965. }
  966. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  967. for _, res := range resRAMBytesAllocated {
  968. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  969. if err != nil {
  970. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  971. continue
  972. }
  973. container, err := res.GetString("container")
  974. if err != nil {
  975. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  976. continue
  977. }
  978. var pods []*Pod
  979. pod, ok := podMap[key]
  980. if !ok {
  981. if uidKeys, ok := podUIDKeyMap[key]; ok {
  982. for _, uidKey := range uidKeys {
  983. pod, ok = podMap[uidKey]
  984. if ok {
  985. pods = append(pods, pod)
  986. }
  987. }
  988. } else {
  989. continue
  990. }
  991. } else {
  992. pods = []*Pod{pod}
  993. }
  994. for _, pod := range pods {
  995. if _, ok := pod.Allocations[container]; !ok {
  996. pod.AppendContainer(container)
  997. }
  998. ramBytes := res.Values[0].Value
  999. hours := pod.Allocations[container].Minutes() / 60.0
  1000. pod.Allocations[container].RAMByteHours = ramBytes * hours
  1001. node, err := res.GetString("node")
  1002. if err != nil {
  1003. log.Warnf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  1004. continue
  1005. }
  1006. pod.Allocations[container].Properties.Node = node
  1007. }
  1008. }
  1009. }
  1010. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1011. for _, res := range resRAMBytesRequested {
  1012. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1013. if err != nil {
  1014. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  1015. continue
  1016. }
  1017. container, err := res.GetString("container")
  1018. if err != nil {
  1019. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  1020. continue
  1021. }
  1022. var pods []*Pod
  1023. pod, ok := podMap[key]
  1024. if !ok {
  1025. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1026. for _, uidKey := range uidKeys {
  1027. pod, ok = podMap[uidKey]
  1028. if ok {
  1029. pods = append(pods, pod)
  1030. }
  1031. }
  1032. } else {
  1033. continue
  1034. }
  1035. } else {
  1036. pods = []*Pod{pod}
  1037. }
  1038. for _, pod := range pods {
  1039. if _, ok := pod.Allocations[container]; !ok {
  1040. pod.AppendContainer(container)
  1041. }
  1042. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  1043. // If RAM allocation is less than requests, set RAMByteHours to
  1044. // request level.
  1045. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  1046. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  1047. }
  1048. node, err := res.GetString("node")
  1049. if err != nil {
  1050. log.Warnf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  1051. continue
  1052. }
  1053. pod.Allocations[container].Properties.Node = node
  1054. }
  1055. }
  1056. }
  1057. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1058. for _, res := range resRAMBytesUsedAvg {
  1059. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1060. if err != nil {
  1061. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  1062. continue
  1063. }
  1064. container, err := res.GetString("container")
  1065. if container == "" || err != nil {
  1066. container, err = res.GetString("container_name")
  1067. if err != nil {
  1068. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  1069. continue
  1070. }
  1071. }
  1072. var pods []*Pod
  1073. pod, ok := podMap[key]
  1074. if !ok {
  1075. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1076. for _, uidKey := range uidKeys {
  1077. pod, ok = podMap[uidKey]
  1078. if ok {
  1079. pods = append(pods, pod)
  1080. }
  1081. }
  1082. } else {
  1083. continue
  1084. }
  1085. } else {
  1086. pods = []*Pod{pod}
  1087. }
  1088. for _, pod := range pods {
  1089. if _, ok := pod.Allocations[container]; !ok {
  1090. pod.AppendContainer(container)
  1091. }
  1092. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  1093. }
  1094. }
  1095. }
  1096. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1097. for _, res := range resRAMBytesUsedMax {
  1098. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1099. if err != nil {
  1100. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  1101. continue
  1102. }
  1103. container, err := res.GetString("container")
  1104. if container == "" || err != nil {
  1105. container, err = res.GetString("container_name")
  1106. if err != nil {
  1107. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  1108. continue
  1109. }
  1110. }
  1111. var pods []*Pod
  1112. pod, ok := podMap[key]
  1113. if !ok {
  1114. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1115. for _, uidKey := range uidKeys {
  1116. pod, ok = podMap[uidKey]
  1117. if ok {
  1118. pods = append(pods, pod)
  1119. }
  1120. }
  1121. } else {
  1122. continue
  1123. }
  1124. } else {
  1125. pods = []*Pod{pod}
  1126. }
  1127. for _, pod := range pods {
  1128. if _, ok := pod.Allocations[container]; !ok {
  1129. pod.AppendContainer(container)
  1130. }
  1131. if pod.Allocations[container].RawAllocationOnly == nil {
  1132. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  1133. RAMBytesUsageMax: res.Values[0].Value,
  1134. }
  1135. } else {
  1136. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  1137. }
  1138. }
  1139. }
  1140. }
  1141. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1142. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  1143. resGPUsRequested = resGPUsAllocated
  1144. }
  1145. for _, res := range resGPUsRequested {
  1146. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1147. if err != nil {
  1148. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  1149. continue
  1150. }
  1151. container, err := res.GetString("container")
  1152. if err != nil {
  1153. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  1154. continue
  1155. }
  1156. var pods []*Pod
  1157. pod, ok := podMap[key]
  1158. if !ok {
  1159. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1160. for _, uidKey := range uidKeys {
  1161. pod, ok = podMap[uidKey]
  1162. if ok {
  1163. pods = append(pods, pod)
  1164. }
  1165. }
  1166. } else {
  1167. continue
  1168. }
  1169. } else {
  1170. pods = []*Pod{pod}
  1171. }
  1172. for _, pod := range pods {
  1173. if _, ok := pod.Allocations[container]; !ok {
  1174. pod.AppendContainer(container)
  1175. }
  1176. hrs := pod.Allocations[container].Minutes() / 60.0
  1177. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  1178. }
  1179. }
  1180. }
  1181. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1182. for _, res := range resNetworkTransferBytes {
  1183. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1184. if err != nil {
  1185. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  1186. continue
  1187. }
  1188. var pods []*Pod
  1189. pod, ok := podMap[podKey]
  1190. if !ok {
  1191. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1192. for _, uidKey := range uidKeys {
  1193. pod, ok = podMap[uidKey]
  1194. if ok {
  1195. pods = append(pods, pod)
  1196. }
  1197. }
  1198. } else {
  1199. continue
  1200. }
  1201. } else {
  1202. pods = []*Pod{pod}
  1203. }
  1204. for _, pod := range pods {
  1205. for _, alloc := range pod.Allocations {
  1206. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations)) / float64(len(pods))
  1207. }
  1208. }
  1209. }
  1210. for _, res := range resNetworkReceiveBytes {
  1211. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1212. if err != nil {
  1213. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  1214. continue
  1215. }
  1216. var pods []*Pod
  1217. pod, ok := podMap[podKey]
  1218. if !ok {
  1219. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1220. for _, uidKey := range uidKeys {
  1221. pod, ok = podMap[uidKey]
  1222. if ok {
  1223. pods = append(pods, pod)
  1224. }
  1225. }
  1226. } else {
  1227. continue
  1228. }
  1229. } else {
  1230. pods = []*Pod{pod}
  1231. }
  1232. for _, pod := range pods {
  1233. for _, alloc := range pod.Allocations {
  1234. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations)) / float64(len(pods))
  1235. }
  1236. }
  1237. }
  1238. }
  1239. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1240. costPerGiBByCluster := map[string]float64{}
  1241. for _, res := range resNetworkCostPerGiB {
  1242. cluster, err := res.GetString(env.GetPromClusterLabel())
  1243. if err != nil {
  1244. cluster = env.GetClusterID()
  1245. }
  1246. costPerGiBByCluster[cluster] = res.Values[0].Value
  1247. }
  1248. for _, res := range resNetworkGiB {
  1249. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1250. if err != nil {
  1251. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  1252. continue
  1253. }
  1254. var pods []*Pod
  1255. pod, ok := podMap[podKey]
  1256. if !ok {
  1257. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1258. for _, uidKey := range uidKeys {
  1259. pod, ok = podMap[uidKey]
  1260. if ok {
  1261. pods = append(pods, pod)
  1262. }
  1263. }
  1264. } else {
  1265. continue
  1266. }
  1267. } else {
  1268. pods = []*Pod{pod}
  1269. }
  1270. for _, pod := range pods {
  1271. for _, alloc := range pod.Allocations {
  1272. gib := res.Values[0].Value / float64(len(pod.Allocations))
  1273. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  1274. alloc.NetworkCost = gib * costPerGiB / float64(len(pods))
  1275. }
  1276. }
  1277. }
  1278. }
  1279. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  1280. namespaceLabels := map[namespaceKey]map[string]string{}
  1281. for _, res := range resNamespaceLabels {
  1282. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  1283. if err != nil {
  1284. continue
  1285. }
  1286. if _, ok := namespaceLabels[nsKey]; !ok {
  1287. namespaceLabels[nsKey] = map[string]string{}
  1288. }
  1289. for k, l := range res.GetLabels() {
  1290. namespaceLabels[nsKey][k] = l
  1291. }
  1292. }
  1293. return namespaceLabels
  1294. }
  1295. func resToPodLabels(resPodLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  1296. podLabels := map[podKey]map[string]string{}
  1297. for _, res := range resPodLabels {
  1298. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1299. if err != nil {
  1300. continue
  1301. }
  1302. var keys []podKey
  1303. if ingestPodUID {
  1304. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1305. keys = append(keys, uidKeys...)
  1306. }
  1307. } else {
  1308. keys = []podKey{key}
  1309. }
  1310. for _, key := range keys {
  1311. if _, ok := podLabels[key]; !ok {
  1312. podLabels[key] = map[string]string{}
  1313. }
  1314. for k, l := range res.GetLabels() {
  1315. podLabels[key][k] = l
  1316. }
  1317. }
  1318. }
  1319. return podLabels
  1320. }
  1321. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  1322. namespaceAnnotations := map[string]map[string]string{}
  1323. for _, res := range resNamespaceAnnotations {
  1324. namespace, err := res.GetString("namespace")
  1325. if err != nil {
  1326. continue
  1327. }
  1328. if _, ok := namespaceAnnotations[namespace]; !ok {
  1329. namespaceAnnotations[namespace] = map[string]string{}
  1330. }
  1331. for k, l := range res.GetAnnotations() {
  1332. namespaceAnnotations[namespace][k] = l
  1333. }
  1334. }
  1335. return namespaceAnnotations
  1336. }
  1337. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  1338. podAnnotations := map[podKey]map[string]string{}
  1339. for _, res := range resPodAnnotations {
  1340. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1341. if err != nil {
  1342. continue
  1343. }
  1344. var keys []podKey
  1345. if ingestPodUID {
  1346. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1347. keys = append(keys, uidKeys...)
  1348. }
  1349. } else {
  1350. keys = []podKey{key}
  1351. }
  1352. for _, key := range keys {
  1353. if _, ok := podAnnotations[key]; !ok {
  1354. podAnnotations[key] = map[string]string{}
  1355. }
  1356. for k, l := range res.GetAnnotations() {
  1357. podAnnotations[key][k] = l
  1358. }
  1359. }
  1360. }
  1361. return podAnnotations
  1362. }
  1363. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  1364. for podKey, pod := range podMap {
  1365. for _, alloc := range pod.Allocations {
  1366. allocLabels := alloc.Properties.Labels
  1367. if allocLabels == nil {
  1368. allocLabels = make(map[string]string)
  1369. }
  1370. // Apply namespace labels first, then pod labels so that pod labels
  1371. // overwrite namespace labels.
  1372. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  1373. if labels, ok := namespaceLabels[nsKey]; ok {
  1374. for k, v := range labels {
  1375. allocLabels[k] = v
  1376. }
  1377. }
  1378. if labels, ok := podLabels[podKey]; ok {
  1379. for k, v := range labels {
  1380. allocLabels[k] = v
  1381. }
  1382. }
  1383. alloc.Properties.Labels = allocLabels
  1384. }
  1385. }
  1386. }
  1387. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  1388. for key, pod := range podMap {
  1389. for _, alloc := range pod.Allocations {
  1390. allocAnnotations := alloc.Properties.Annotations
  1391. if allocAnnotations == nil {
  1392. allocAnnotations = make(map[string]string)
  1393. }
  1394. // Apply namespace annotations first, then pod annotations so that
  1395. // pod labels overwrite namespace labels.
  1396. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  1397. for k, v := range labels {
  1398. allocAnnotations[k] = v
  1399. }
  1400. }
  1401. if labels, ok := podAnnotations[key]; ok {
  1402. for k, v := range labels {
  1403. allocAnnotations[k] = v
  1404. }
  1405. }
  1406. alloc.Properties.Annotations = allocAnnotations
  1407. }
  1408. }
  1409. }
  1410. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  1411. serviceLabels := map[serviceKey]map[string]string{}
  1412. for _, res := range resServiceLabels {
  1413. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  1414. if err != nil {
  1415. continue
  1416. }
  1417. if _, ok := serviceLabels[serviceKey]; !ok {
  1418. serviceLabels[serviceKey] = map[string]string{}
  1419. }
  1420. for k, l := range res.GetLabels() {
  1421. serviceLabels[serviceKey][k] = l
  1422. }
  1423. }
  1424. // Prune duplicate services. That is, if the same service exists with
  1425. // hyphens instead of underscores, keep the one that uses hyphens.
  1426. for key := range serviceLabels {
  1427. if strings.Contains(key.Service, "_") {
  1428. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1429. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1430. if _, ok := serviceLabels[duplicateKey]; ok {
  1431. delete(serviceLabels, key)
  1432. }
  1433. }
  1434. }
  1435. return serviceLabels
  1436. }
  1437. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1438. deploymentLabels := map[controllerKey]map[string]string{}
  1439. for _, res := range resDeploymentLabels {
  1440. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1441. if err != nil {
  1442. continue
  1443. }
  1444. if _, ok := deploymentLabels[controllerKey]; !ok {
  1445. deploymentLabels[controllerKey] = map[string]string{}
  1446. }
  1447. for k, l := range res.GetLabels() {
  1448. deploymentLabels[controllerKey][k] = l
  1449. }
  1450. }
  1451. // Prune duplicate deployments. That is, if the same deployment exists with
  1452. // hyphens instead of underscores, keep the one that uses hyphens.
  1453. for key := range deploymentLabels {
  1454. if strings.Contains(key.Controller, "_") {
  1455. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1456. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1457. if _, ok := deploymentLabels[duplicateKey]; ok {
  1458. delete(deploymentLabels, key)
  1459. }
  1460. }
  1461. }
  1462. return deploymentLabels
  1463. }
  1464. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1465. statefulSetLabels := map[controllerKey]map[string]string{}
  1466. for _, res := range resStatefulSetLabels {
  1467. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1468. if err != nil {
  1469. continue
  1470. }
  1471. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1472. statefulSetLabels[controllerKey] = map[string]string{}
  1473. }
  1474. for k, l := range res.GetLabels() {
  1475. statefulSetLabels[controllerKey][k] = l
  1476. }
  1477. }
  1478. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1479. // with hyphens instead of underscores, keep the one that uses hyphens.
  1480. for key := range statefulSetLabels {
  1481. if strings.Contains(key.Controller, "_") {
  1482. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1483. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1484. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1485. delete(statefulSetLabels, key)
  1486. }
  1487. }
  1488. }
  1489. return statefulSetLabels
  1490. }
  1491. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1492. podControllerMap := map[podKey]controllerKey{}
  1493. // For each controller, turn the labels into a selector and attempt to
  1494. // match it with each set of pod labels. A match indicates that the pod
  1495. // belongs to the controller.
  1496. for cKey, cLabels := range controllerLabels {
  1497. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1498. for pKey, pLabels := range podLabels {
  1499. // If the pod is in a different cluster or namespace, there is
  1500. // no need to compare the labels.
  1501. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1502. continue
  1503. }
  1504. podLabelSet := labels.Set(pLabels)
  1505. if selector.Matches(podLabelSet) {
  1506. if _, ok := podControllerMap[pKey]; ok {
  1507. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1508. }
  1509. podControllerMap[pKey] = cKey
  1510. }
  1511. }
  1512. }
  1513. return podControllerMap
  1514. }
  1515. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1516. daemonSetLabels := map[podKey]controllerKey{}
  1517. for _, res := range resDaemonSetLabels {
  1518. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1519. if err != nil {
  1520. continue
  1521. }
  1522. pod, err := res.GetString("pod")
  1523. if err != nil {
  1524. log.Warnf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1525. }
  1526. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1527. var keys []podKey
  1528. if ingestPodUID {
  1529. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1530. keys = append(keys, uidKeys...)
  1531. }
  1532. } else {
  1533. keys = []podKey{key}
  1534. }
  1535. for _, key := range keys {
  1536. daemonSetLabels[key] = controllerKey
  1537. }
  1538. }
  1539. return daemonSetLabels
  1540. }
  1541. func resToPodJobMap(resJobLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1542. jobLabels := map[podKey]controllerKey{}
  1543. for _, res := range resJobLabels {
  1544. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1545. if err != nil {
  1546. continue
  1547. }
  1548. // Convert the name of Jobs generated by CronJobs to the name of the
  1549. // CronJob by stripping the timestamp off the end.
  1550. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1551. if match != nil {
  1552. controllerKey.Controller = match[1]
  1553. }
  1554. pod, err := res.GetString("pod")
  1555. if err != nil {
  1556. log.Warnf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1557. }
  1558. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1559. var keys []podKey
  1560. if ingestPodUID {
  1561. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1562. keys = append(keys, uidKeys...)
  1563. }
  1564. } else {
  1565. keys = []podKey{key}
  1566. }
  1567. for _, key := range keys {
  1568. jobLabels[key] = controllerKey
  1569. }
  1570. }
  1571. return jobLabels
  1572. }
  1573. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1574. // Build out set of ReplicaSets that have no owners, themselves, such that
  1575. // the ReplicaSet should be used as the owner of the Pods it controls.
  1576. // (This should exclude, for example, ReplicaSets that are controlled by
  1577. // Deployments, in which case the Deployment should be the Pod's owner.)
  1578. replicaSets := map[controllerKey]struct{}{}
  1579. for _, res := range resReplicaSetsWithoutOwners {
  1580. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1581. if err != nil {
  1582. continue
  1583. }
  1584. replicaSets[controllerKey] = struct{}{}
  1585. }
  1586. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1587. // to not appear in the set of uncontrolled ReplicaSets above.
  1588. podToReplicaSet := map[podKey]controllerKey{}
  1589. for _, res := range resPodsWithReplicaSetOwner {
  1590. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1591. if err != nil {
  1592. continue
  1593. }
  1594. if _, ok := replicaSets[controllerKey]; !ok {
  1595. continue
  1596. }
  1597. pod, err := res.GetString("pod")
  1598. if err != nil {
  1599. log.Warnf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1600. }
  1601. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1602. var keys []podKey
  1603. if ingestPodUID {
  1604. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1605. keys = append(keys, uidKeys...)
  1606. }
  1607. } else {
  1608. keys = []podKey{key}
  1609. }
  1610. for _, key := range keys {
  1611. podToReplicaSet[key] = controllerKey
  1612. }
  1613. }
  1614. return podToReplicaSet
  1615. }
  1616. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1617. podServicesMap := map[podKey][]serviceKey{}
  1618. // For each service, turn the labels into a selector and attempt to
  1619. // match it with each set of pod labels. A match indicates that the pod
  1620. // belongs to the service.
  1621. for sKey, sLabels := range serviceLabels {
  1622. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1623. for pKey, pLabels := range podLabels {
  1624. // If the pod is in a different cluster or namespace, there is
  1625. // no need to compare the labels.
  1626. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1627. continue
  1628. }
  1629. podLabelSet := labels.Set(pLabels)
  1630. if selector.Matches(podLabelSet) {
  1631. if _, ok := podServicesMap[pKey]; !ok {
  1632. podServicesMap[pKey] = []serviceKey{}
  1633. }
  1634. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1635. }
  1636. }
  1637. }
  1638. // For each allocation in each pod, attempt to find and apply the list of
  1639. // services associated with the allocation's pod.
  1640. for key, pod := range podMap {
  1641. for _, alloc := range pod.Allocations {
  1642. if sKeys, ok := podServicesMap[key]; ok {
  1643. services := []string{}
  1644. for _, sKey := range sKeys {
  1645. services = append(services, sKey.Service)
  1646. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1647. }
  1648. alloc.Properties.Services = services
  1649. }
  1650. }
  1651. }
  1652. }
  1653. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1654. for key, pod := range podMap {
  1655. for _, alloc := range pod.Allocations {
  1656. if controllerKey, ok := podControllerMap[key]; ok {
  1657. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1658. alloc.Properties.Controller = controllerKey.Controller
  1659. }
  1660. }
  1661. }
  1662. }
  1663. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1664. for _, res := range resNodeCostPerCPUHr {
  1665. cluster, err := res.GetString(env.GetPromClusterLabel())
  1666. if err != nil {
  1667. cluster = env.GetClusterID()
  1668. }
  1669. node, err := res.GetString("node")
  1670. if err != nil {
  1671. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1672. continue
  1673. }
  1674. instanceType, err := res.GetString("instance_type")
  1675. if err != nil {
  1676. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1677. continue
  1678. }
  1679. providerID, err := res.GetString("provider_id")
  1680. if err != nil {
  1681. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1682. continue
  1683. }
  1684. key := newNodeKey(cluster, node)
  1685. if _, ok := nodeMap[key]; !ok {
  1686. nodeMap[key] = &NodePricing{
  1687. Name: node,
  1688. NodeType: instanceType,
  1689. ProviderID: cloud.ParseID(providerID),
  1690. }
  1691. }
  1692. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1693. }
  1694. }
  1695. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1696. for _, res := range resNodeCostPerRAMGiBHr {
  1697. cluster, err := res.GetString(env.GetPromClusterLabel())
  1698. if err != nil {
  1699. cluster = env.GetClusterID()
  1700. }
  1701. node, err := res.GetString("node")
  1702. if err != nil {
  1703. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1704. continue
  1705. }
  1706. instanceType, err := res.GetString("instance_type")
  1707. if err != nil {
  1708. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1709. continue
  1710. }
  1711. providerID, err := res.GetString("provider_id")
  1712. if err != nil {
  1713. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1714. continue
  1715. }
  1716. key := newNodeKey(cluster, node)
  1717. if _, ok := nodeMap[key]; !ok {
  1718. nodeMap[key] = &NodePricing{
  1719. Name: node,
  1720. NodeType: instanceType,
  1721. ProviderID: cloud.ParseID(providerID),
  1722. }
  1723. }
  1724. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1725. }
  1726. }
  1727. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1728. for _, res := range resNodeCostPerGPUHr {
  1729. cluster, err := res.GetString(env.GetPromClusterLabel())
  1730. if err != nil {
  1731. cluster = env.GetClusterID()
  1732. }
  1733. node, err := res.GetString("node")
  1734. if err != nil {
  1735. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1736. continue
  1737. }
  1738. instanceType, err := res.GetString("instance_type")
  1739. if err != nil {
  1740. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1741. continue
  1742. }
  1743. providerID, err := res.GetString("provider_id")
  1744. if err != nil {
  1745. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1746. continue
  1747. }
  1748. key := newNodeKey(cluster, node)
  1749. if _, ok := nodeMap[key]; !ok {
  1750. nodeMap[key] = &NodePricing{
  1751. Name: node,
  1752. NodeType: instanceType,
  1753. ProviderID: cloud.ParseID(providerID),
  1754. }
  1755. }
  1756. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1757. }
  1758. }
  1759. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1760. for _, res := range resNodeIsSpot {
  1761. cluster, err := res.GetString(env.GetPromClusterLabel())
  1762. if err != nil {
  1763. cluster = env.GetClusterID()
  1764. }
  1765. node, err := res.GetString("node")
  1766. if err != nil {
  1767. log.Warnf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1768. continue
  1769. }
  1770. key := newNodeKey(cluster, node)
  1771. if _, ok := nodeMap[key]; !ok {
  1772. log.Warnf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1773. continue
  1774. }
  1775. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1776. }
  1777. }
  1778. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1779. if cm == nil {
  1780. return
  1781. }
  1782. c, err := cm.Provider.GetConfig()
  1783. if err != nil {
  1784. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1785. return
  1786. }
  1787. discount, err := ParsePercentString(c.Discount)
  1788. if err != nil {
  1789. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1790. return
  1791. }
  1792. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1793. if err != nil {
  1794. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1795. return
  1796. }
  1797. for _, node := range nodeMap {
  1798. // TODO GKE Reserved Instances into account
  1799. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1800. node.CostPerCPUHr *= (1.0 - node.Discount)
  1801. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1802. }
  1803. }
  1804. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1805. for _, res := range resPVCostPerGiBHour {
  1806. cluster, err := res.GetString(env.GetPromClusterLabel())
  1807. if err != nil {
  1808. cluster = env.GetClusterID()
  1809. }
  1810. name, err := res.GetString("volumename")
  1811. if err != nil {
  1812. log.Warnf("CostModel.ComputeAllocation: PV cost without volumename")
  1813. continue
  1814. }
  1815. key := newPVKey(cluster, name)
  1816. pvMap[key] = &PV{
  1817. Cluster: cluster,
  1818. Name: name,
  1819. CostPerGiBHour: res.Values[0].Value,
  1820. }
  1821. }
  1822. }
  1823. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1824. for _, res := range resPVBytes {
  1825. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1826. if err != nil {
  1827. log.Warnf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1828. continue
  1829. }
  1830. if _, ok := pvMap[key]; !ok {
  1831. log.Warnf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1832. continue
  1833. }
  1834. pvMap[key].Bytes = res.Values[0].Value
  1835. }
  1836. }
  1837. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1838. for _, res := range resPVCInfo {
  1839. cluster, err := res.GetString(env.GetPromClusterLabel())
  1840. if err != nil {
  1841. cluster = env.GetClusterID()
  1842. }
  1843. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1844. if err != nil {
  1845. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1846. continue
  1847. }
  1848. namespace := values["namespace"]
  1849. name := values["persistentvolumeclaim"]
  1850. volume := values["volumename"]
  1851. storageClass := values["storageclass"]
  1852. pvKey := newPVKey(cluster, volume)
  1853. pvcKey := newPVCKey(cluster, namespace, name)
  1854. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1855. // the PVC was running, respectively. We subtract 1m from pvcStart
  1856. // because this point will actually represent the end of the first
  1857. // minute. We don't subtract from pvcEnd because it already represents
  1858. // the end of the last minute.
  1859. var pvcStart, pvcEnd time.Time
  1860. for _, datum := range res.Values {
  1861. t := time.Unix(int64(datum.Timestamp), 0)
  1862. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1863. pvcStart = t
  1864. }
  1865. if datum.Value > 0 && window.Contains(t) {
  1866. pvcEnd = t
  1867. }
  1868. }
  1869. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1870. log.Warnf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1871. }
  1872. pvcStart = pvcStart.Add(-time.Minute)
  1873. if _, ok := pvMap[pvKey]; !ok {
  1874. continue
  1875. }
  1876. pvMap[pvKey].StorageClass = storageClass
  1877. if _, ok := pvcMap[pvcKey]; !ok {
  1878. pvcMap[pvcKey] = &PVC{}
  1879. }
  1880. pvcMap[pvcKey].Name = name
  1881. pvcMap[pvcKey].Namespace = namespace
  1882. pvcMap[pvcKey].Cluster = cluster
  1883. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1884. pvcMap[pvcKey].Start = pvcStart
  1885. pvcMap[pvcKey].End = pvcEnd
  1886. }
  1887. }
  1888. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1889. for _, res := range resPVCBytesRequested {
  1890. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1891. if err != nil {
  1892. continue
  1893. }
  1894. if _, ok := pvcMap[key]; !ok {
  1895. continue
  1896. }
  1897. pvcMap[key].Bytes = res.Values[0].Value
  1898. }
  1899. }
  1900. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) {
  1901. for _, res := range resPodPVCAllocation {
  1902. cluster, err := res.GetString(env.GetPromClusterLabel())
  1903. if err != nil {
  1904. cluster = env.GetClusterID()
  1905. }
  1906. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1907. if err != nil {
  1908. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1909. continue
  1910. }
  1911. namespace := values["namespace"]
  1912. pod := values["pod"]
  1913. name := values["persistentvolumeclaim"]
  1914. volume := values["persistentvolume"]
  1915. key := newPodKey(cluster, namespace, pod)
  1916. pvKey := newPVKey(cluster, volume)
  1917. pvcKey := newPVCKey(cluster, namespace, name)
  1918. var keys []podKey
  1919. if ingestPodUID {
  1920. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1921. keys = append(keys, uidKeys...)
  1922. }
  1923. } else {
  1924. keys = []podKey{key}
  1925. }
  1926. for _, key := range keys {
  1927. if _, ok := pvMap[pvKey]; !ok {
  1928. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1929. continue
  1930. }
  1931. if _, ok := podPVCMap[key]; !ok {
  1932. podPVCMap[key] = []*PVC{}
  1933. }
  1934. pvc, ok := pvcMap[pvcKey]
  1935. if !ok {
  1936. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1937. continue
  1938. }
  1939. count := 1
  1940. if pod, ok := podMap[key]; ok && len(pod.Allocations) > 0 {
  1941. count = len(pod.Allocations)
  1942. } else {
  1943. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, key)
  1944. }
  1945. pvc.Count = count
  1946. pvc.Mounted = true
  1947. podPVCMap[key] = append(podPVCMap[key], pvc)
  1948. }
  1949. }
  1950. }
  1951. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1952. unmountedPVBytes := map[string]float64{}
  1953. unmountedPVCost := map[string]float64{}
  1954. for _, pv := range pvMap {
  1955. mounted := false
  1956. for _, pvc := range pvcMap {
  1957. if pvc.Volume == nil {
  1958. continue
  1959. }
  1960. if pvc.Volume == pv {
  1961. mounted = true
  1962. break
  1963. }
  1964. }
  1965. if !mounted {
  1966. gib := pv.Bytes / 1024 / 1024 / 1024
  1967. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1968. cost := pv.CostPerGiBHour * gib * hrs
  1969. unmountedPVCost[pv.Cluster] += cost
  1970. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1971. }
  1972. }
  1973. for cluster, amount := range unmountedPVCost {
  1974. container := kubecost.UnmountedSuffix
  1975. pod := kubecost.UnmountedSuffix
  1976. namespace := kubecost.UnmountedSuffix
  1977. node := ""
  1978. key := newPodKey(cluster, namespace, pod)
  1979. podMap[key] = &Pod{
  1980. Window: window.Clone(),
  1981. Start: *window.Start(),
  1982. End: *window.End(),
  1983. Key: key,
  1984. Allocations: map[string]*kubecost.Allocation{},
  1985. }
  1986. podMap[key].AppendContainer(container)
  1987. podMap[key].Allocations[container].Properties.Cluster = cluster
  1988. podMap[key].Allocations[container].Properties.Node = node
  1989. podMap[key].Allocations[container].Properties.Namespace = namespace
  1990. podMap[key].Allocations[container].Properties.Pod = pod
  1991. podMap[key].Allocations[container].Properties.Container = container
  1992. pvKey := kubecost.PVKey{
  1993. Cluster: cluster,
  1994. Name: kubecost.UnmountedSuffix,
  1995. }
  1996. unmountedPVs := kubecost.PVAllocations{
  1997. pvKey: {
  1998. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1999. Cost: amount,
  2000. },
  2001. }
  2002. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  2003. }
  2004. }
  2005. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  2006. unmountedPVCBytes := map[namespaceKey]float64{}
  2007. unmountedPVCCost := map[namespaceKey]float64{}
  2008. for _, pvc := range pvcMap {
  2009. if !pvc.Mounted && pvc.Volume != nil {
  2010. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  2011. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  2012. hrs := pvc.Minutes() / 60.0
  2013. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  2014. unmountedPVCCost[key] += cost
  2015. unmountedPVCBytes[key] += pvc.Volume.Bytes
  2016. }
  2017. }
  2018. for key, amount := range unmountedPVCCost {
  2019. container := kubecost.UnmountedSuffix
  2020. pod := kubecost.UnmountedSuffix
  2021. namespace := key.Namespace
  2022. node := ""
  2023. cluster := key.Cluster
  2024. podKey := newPodKey(cluster, namespace, pod)
  2025. podMap[podKey] = &Pod{
  2026. Window: window.Clone(),
  2027. Start: *window.Start(),
  2028. End: *window.End(),
  2029. Key: podKey,
  2030. Allocations: map[string]*kubecost.Allocation{},
  2031. }
  2032. podMap[podKey].AppendContainer(container)
  2033. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  2034. podMap[podKey].Allocations[container].Properties.Node = node
  2035. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  2036. podMap[podKey].Allocations[container].Properties.Pod = pod
  2037. podMap[podKey].Allocations[container].Properties.Container = container
  2038. pvKey := kubecost.PVKey{
  2039. Cluster: cluster,
  2040. Name: kubecost.UnmountedSuffix,
  2041. }
  2042. unmountedPVs := kubecost.PVAllocations{
  2043. pvKey: {
  2044. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  2045. Cost: amount,
  2046. },
  2047. }
  2048. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  2049. }
  2050. }
  2051. // LB describes the start and end time of a Load Balancer along with cost
  2052. type LB struct {
  2053. TotalCost float64
  2054. Start time.Time
  2055. End time.Time
  2056. }
  2057. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  2058. lbMap := make(map[serviceKey]*LB)
  2059. for _, res := range resLBActiveMins {
  2060. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  2061. if err != nil || len(res.Values) == 0 {
  2062. continue
  2063. }
  2064. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  2065. // subtract resolution from start time to cover full time period
  2066. s = s.Add(-resolution)
  2067. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  2068. lbMap[serviceKey] = &LB{
  2069. Start: s,
  2070. End: e,
  2071. }
  2072. }
  2073. for _, res := range resLBCost {
  2074. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  2075. if err != nil {
  2076. continue
  2077. }
  2078. // Apply cost as price-per-hour * hours
  2079. if lb, ok := lbMap[serviceKey]; ok {
  2080. lbPricePerHr := res.Values[0].Value
  2081. hours := lb.End.Sub(lb.Start).Hours()
  2082. lb.TotalCost += lbPricePerHr * hours
  2083. } else {
  2084. log.DedupedWarningf(20, "CostModel: found minutes for key that does not exist: %s", serviceKey)
  2085. }
  2086. }
  2087. return lbMap
  2088. }
  2089. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  2090. for sKey, lb := range lbMap {
  2091. totalHours := 0.0
  2092. allocHours := make(map[*kubecost.Allocation]float64)
  2093. // Add portion of load balancing cost to each allocation
  2094. // proportional to the total number of hours allocations used the load balancer
  2095. for _, alloc := range allocsByService[sKey] {
  2096. // Determine the (start, end) of the relationship between the
  2097. // given LB and the associated Allocation so that a precise
  2098. // number of hours can be used to compute cumulative cost.
  2099. s, e := alloc.Start, alloc.End
  2100. if lb.Start.After(alloc.Start) {
  2101. s = lb.Start
  2102. }
  2103. if lb.End.Before(alloc.End) {
  2104. e = lb.End
  2105. }
  2106. hours := e.Sub(s).Hours()
  2107. // A negative number of hours signifies no overlap between the windows
  2108. if hours > 0 {
  2109. totalHours += hours
  2110. allocHours[alloc] = hours
  2111. }
  2112. }
  2113. // Distribute cost of service once total hours is calculated
  2114. for alloc, hours := range allocHours {
  2115. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  2116. }
  2117. }
  2118. }
  2119. // getNodePricing determines node pricing, given a key and a mapping from keys
  2120. // to their NodePricing instances, as well as the custom pricing configuration
  2121. // inherent to the CostModel instance. If custom pricing is set, use that. If
  2122. // not, use the pricing defined by the given key. If that doesn't exist, fall
  2123. // back on custom pricing as a default.
  2124. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  2125. // Find the relevant NodePricing, if it exists. If not, substitute the
  2126. // custom NodePricing as a default.
  2127. node, ok := nodeMap[nodeKey]
  2128. if !ok || node == nil {
  2129. if nodeKey.Node != "" {
  2130. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  2131. }
  2132. return cm.getCustomNodePricing(false)
  2133. }
  2134. // If custom pricing is enabled and can be retrieved, override detected
  2135. // node pricing with the custom values.
  2136. customPricingConfig, err := cm.Provider.GetConfig()
  2137. if err != nil {
  2138. log.Warnf("CostModel: failed to load custom pricing: %s", err)
  2139. }
  2140. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  2141. return cm.getCustomNodePricing(node.Preemptible)
  2142. }
  2143. node.Source = "prometheus"
  2144. // If any of the values are NaN or zero, replace them with the custom
  2145. // values as default.
  2146. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  2147. // them as strings like this?
  2148. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  2149. log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  2150. cpuCostStr := customPricingConfig.CPU
  2151. if node.Preemptible {
  2152. cpuCostStr = customPricingConfig.SpotCPU
  2153. }
  2154. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  2155. if err != nil {
  2156. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  2157. }
  2158. node.CostPerCPUHr = costPerCPUHr
  2159. node.Source += "/customCPU"
  2160. }
  2161. if math.IsNaN(node.CostPerGPUHr) {
  2162. log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  2163. gpuCostStr := customPricingConfig.GPU
  2164. if node.Preemptible {
  2165. gpuCostStr = customPricingConfig.SpotGPU
  2166. }
  2167. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  2168. if err != nil {
  2169. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  2170. }
  2171. node.CostPerGPUHr = costPerGPUHr
  2172. node.Source += "/customGPU"
  2173. }
  2174. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  2175. log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  2176. ramCostStr := customPricingConfig.RAM
  2177. if node.Preemptible {
  2178. ramCostStr = customPricingConfig.SpotRAM
  2179. }
  2180. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  2181. if err != nil {
  2182. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  2183. }
  2184. node.CostPerRAMGiBHr = costPerRAMHr
  2185. node.Source += "/customRAM"
  2186. }
  2187. return node
  2188. }
  2189. // getCustomNodePricing converts the CostModel's configured custom pricing
  2190. // values into a NodePricing instance.
  2191. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  2192. customPricingConfig, err := cm.Provider.GetConfig()
  2193. if err != nil {
  2194. return nil
  2195. }
  2196. cpuCostStr := customPricingConfig.CPU
  2197. gpuCostStr := customPricingConfig.GPU
  2198. ramCostStr := customPricingConfig.RAM
  2199. if spot {
  2200. cpuCostStr = customPricingConfig.SpotCPU
  2201. gpuCostStr = customPricingConfig.SpotGPU
  2202. ramCostStr = customPricingConfig.SpotRAM
  2203. }
  2204. node := &NodePricing{Source: "custom"}
  2205. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  2206. if err != nil {
  2207. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  2208. }
  2209. node.CostPerCPUHr = costPerCPUHr
  2210. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  2211. if err != nil {
  2212. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  2213. }
  2214. node.CostPerGPUHr = costPerGPUHr
  2215. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  2216. if err != nil {
  2217. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  2218. }
  2219. node.CostPerRAMGiBHr = costPerRAMHr
  2220. return node
  2221. }
  2222. // NodePricing describes the resource costs associated with a given node, as
  2223. // well as the source of the information (e.g. prometheus, custom)
  2224. type NodePricing struct {
  2225. Name string
  2226. NodeType string
  2227. ProviderID string
  2228. Preemptible bool
  2229. CostPerCPUHr float64
  2230. CostPerRAMGiBHr float64
  2231. CostPerGPUHr float64
  2232. Discount float64
  2233. Source string
  2234. }
  2235. // Pod describes a running pod's start and end time within a Window and
  2236. // all the Allocations (i.e. containers) contained within it.
  2237. type Pod struct {
  2238. Window kubecost.Window
  2239. Start time.Time
  2240. End time.Time
  2241. Key podKey
  2242. Allocations map[string]*kubecost.Allocation
  2243. }
  2244. // AppendContainer adds an entry for the given container name to the Pod.
  2245. func (p Pod) AppendContainer(container string) {
  2246. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  2247. alloc := &kubecost.Allocation{
  2248. Name: name,
  2249. Properties: &kubecost.AllocationProperties{},
  2250. Window: p.Window.Clone(),
  2251. Start: p.Start,
  2252. End: p.End,
  2253. }
  2254. alloc.Properties.Container = container
  2255. alloc.Properties.Pod = p.Key.Pod
  2256. alloc.Properties.Namespace = p.Key.Namespace
  2257. alloc.Properties.Cluster = p.Key.Cluster
  2258. p.Allocations[container] = alloc
  2259. }
  2260. // PVC describes a PersistentVolumeClaim
  2261. // TODO:CLEANUP move to pkg/kubecost?
  2262. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  2263. type PVC struct {
  2264. Bytes float64 `json:"bytes"`
  2265. Count int `json:"count"`
  2266. Name string `json:"name"`
  2267. Cluster string `json:"cluster"`
  2268. Namespace string `json:"namespace"`
  2269. Volume *PV `json:"persistentVolume"`
  2270. Mounted bool `json:"mounted"`
  2271. Start time.Time `json:"start"`
  2272. End time.Time `json:"end"`
  2273. }
  2274. // Cost computes the cumulative cost of the PVC
  2275. func (pvc *PVC) Cost() float64 {
  2276. if pvc == nil || pvc.Volume == nil {
  2277. return 0.0
  2278. }
  2279. gib := pvc.Bytes / 1024 / 1024 / 1024
  2280. hrs := pvc.Minutes() / 60.0
  2281. return pvc.Volume.CostPerGiBHour * gib * hrs
  2282. }
  2283. // Minutes computes the number of minutes over which the PVC is defined
  2284. func (pvc *PVC) Minutes() float64 {
  2285. if pvc == nil {
  2286. return 0.0
  2287. }
  2288. return pvc.End.Sub(pvc.Start).Minutes()
  2289. }
  2290. // String returns a string representation of the PVC
  2291. func (pvc *PVC) String() string {
  2292. if pvc == nil {
  2293. return "<nil>"
  2294. }
  2295. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  2296. }
  2297. // PV describes a PersistentVolume
  2298. // TODO:CLEANUP move to pkg/kubecost?
  2299. type PV struct {
  2300. Bytes float64 `json:"bytes"`
  2301. CostPerGiBHour float64 `json:"costPerGiBHour"`
  2302. Cluster string `json:"cluster"`
  2303. Name string `json:"name"`
  2304. StorageClass string `json:"storageClass"`
  2305. }
  2306. // String returns a string representation of the PV
  2307. func (pv *PV) String() string {
  2308. if pv == nil {
  2309. return "<nil>"
  2310. }
  2311. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  2312. }