allocation.go 93 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/opencost/pkg/util/timeutil"
  9. "github.com/kubecost/opencost/pkg/cloud"
  10. "github.com/kubecost/opencost/pkg/env"
  11. "github.com/kubecost/opencost/pkg/kubecost"
  12. "github.com/kubecost/opencost/pkg/log"
  13. "github.com/kubecost/opencost/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
  18. queryFmtPodsUID = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
  19. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  20. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  21. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  23. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  24. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  25. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  27. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  28. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  29. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  32. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s])`
  33. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  34. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
  35. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  36. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
  37. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
  38. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  39. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
  40. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  41. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
  42. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  43. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
  44. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  45. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  46. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s])`
  47. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s])`
  48. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s])`
  49. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s])`
  50. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s])`
  51. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s])`
  52. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s])`
  53. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
  54. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
  55. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
  56. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
  57. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
  58. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
  59. )
  60. // This is a bit of a hack to work around garbage data from cadvisor
  61. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  62. const MAX_CPU_CAP = 512
  63. // CanCompute should return true if CostModel can act as a valid source for the
  64. // given time range. In the case of CostModel we want to attempt to compute as
  65. // long as the range starts in the past. If the CostModel ends up not having
  66. // data to match, that's okay, and should be communicated with an error
  67. // response from ComputeAllocation.
  68. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  69. return start.Before(time.Now())
  70. }
  71. // Name returns the name of the Source
  72. func (cm *CostModel) Name() string {
  73. return "CostModel"
  74. }
  75. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  76. // for the window defined by the given start and end times. The Allocations
  77. // returned are unaggregated (i.e. down to the container level).
  78. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  79. // If the duration is short enough, compute the AllocationSet directly
  80. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  81. return cm.computeAllocation(start, end, resolution)
  82. }
  83. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  84. // query for maximum-sized AllocationSets, collect them, and accumulate.
  85. // s and e track the coverage of the entire given window over multiple
  86. // internal queries.
  87. s, e := start, start
  88. // Collect AllocationSets in a range, then accumulate
  89. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  90. asr := kubecost.NewAllocationSetRange()
  91. for e.Before(end) {
  92. // By default, query for the full remaining duration. But do not let
  93. // any individual query duration exceed the configured max Prometheus
  94. // query duration.
  95. duration := end.Sub(e)
  96. if duration > cm.MaxPrometheusQueryDuration {
  97. duration = cm.MaxPrometheusQueryDuration
  98. }
  99. // Set start and end parameters (s, e) for next individual computation.
  100. e = s.Add(duration)
  101. // Compute the individual AllocationSet for just (s, e)
  102. as, err := cm.computeAllocation(s, e, resolution)
  103. if err != nil {
  104. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
  105. }
  106. // Append to the range
  107. asr.Append(as)
  108. // Set s equal to e to set up the next query, if one exists.
  109. s = e
  110. }
  111. // Populate annotations, labels, and services on each Allocation. This is
  112. // necessary because Properties.Intersection does not propagate any values
  113. // stored in maps or slices for performance reasons. In this case, however,
  114. // it is both acceptable and necessary to do so.
  115. allocationAnnotations := map[string]map[string]string{}
  116. allocationLabels := map[string]map[string]string{}
  117. allocationServices := map[string]map[string]bool{}
  118. // Also record errors and warnings, then append them to the results later.
  119. errors := []string{}
  120. warnings := []string{}
  121. asr.Each(func(i int, as *kubecost.AllocationSet) {
  122. as.Each(func(k string, a *kubecost.Allocation) {
  123. if len(a.Properties.Annotations) > 0 {
  124. if _, ok := allocationAnnotations[k]; !ok {
  125. allocationAnnotations[k] = map[string]string{}
  126. }
  127. for name, val := range a.Properties.Annotations {
  128. allocationAnnotations[k][name] = val
  129. }
  130. }
  131. if len(a.Properties.Labels) > 0 {
  132. if _, ok := allocationLabels[k]; !ok {
  133. allocationLabels[k] = map[string]string{}
  134. }
  135. for name, val := range a.Properties.Labels {
  136. allocationLabels[k][name] = val
  137. }
  138. }
  139. if len(a.Properties.Services) > 0 {
  140. if _, ok := allocationServices[k]; !ok {
  141. allocationServices[k] = map[string]bool{}
  142. }
  143. for _, val := range a.Properties.Services {
  144. allocationServices[k][val] = true
  145. }
  146. }
  147. })
  148. errors = append(errors, as.Errors...)
  149. warnings = append(warnings, as.Warnings...)
  150. })
  151. // Accumulate to yield the result AllocationSet. After this step, we will
  152. // be nearly complete, but without the raw allocation data, which must be
  153. // recomputed.
  154. result, err := asr.Accumulate()
  155. if err != nil {
  156. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
  157. }
  158. // Apply the annotations, labels, and services to the post-accumulation
  159. // results. (See above for why this is necessary.)
  160. result.Each(func(k string, a *kubecost.Allocation) {
  161. if annotations, ok := allocationAnnotations[k]; ok {
  162. a.Properties.Annotations = annotations
  163. }
  164. if labels, ok := allocationLabels[k]; ok {
  165. a.Properties.Labels = labels
  166. }
  167. if services, ok := allocationServices[k]; ok {
  168. a.Properties.Services = []string{}
  169. for s := range services {
  170. a.Properties.Services = append(a.Properties.Services, s)
  171. }
  172. }
  173. // Expand the Window of all Allocations within the AllocationSet
  174. // to match the Window of the AllocationSet, which gets expanded
  175. // at the end of this function.
  176. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  177. })
  178. // Maintain RAM and CPU max usage values by iterating over the range,
  179. // computing maximums on a rolling basis, and setting on the result set.
  180. asr.Each(func(i int, as *kubecost.AllocationSet) {
  181. as.Each(func(key string, alloc *kubecost.Allocation) {
  182. resultAlloc := result.Get(key)
  183. if resultAlloc == nil {
  184. return
  185. }
  186. if resultAlloc.RawAllocationOnly == nil {
  187. resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
  188. }
  189. if alloc.RawAllocationOnly == nil {
  190. // This will happen inevitably for unmounted disks, but should
  191. // ideally not happen for any allocation with CPU and RAM data.
  192. if !alloc.IsUnmounted() {
  193. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  194. }
  195. return
  196. }
  197. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  198. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  199. }
  200. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  201. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  202. }
  203. })
  204. })
  205. // Expand the window to match the queried time range.
  206. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  207. // Append errors and warnings
  208. result.Errors = errors
  209. result.Warnings = warnings
  210. return result, nil
  211. }
  212. func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  213. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  214. // 2. Run and apply the results of the remaining queries to
  215. // 3. Build out AllocationSet from completed Pod map
  216. // Create a window spanning the requested query
  217. window := kubecost.NewWindow(&start, &end)
  218. // Create an empty AllocationSet. For safety, in the case of an error, we
  219. // should prefer to return this empty set with the error. (In the case of
  220. // no error, of course we populate the set and return it.)
  221. allocSet := kubecost.NewAllocationSet(start, end)
  222. // (1) Build out Pod map
  223. // Build out a map of Allocations as a mapping from pod-to-container-to-
  224. // underlying-Allocation instance, starting with (start, end) so that we
  225. // begin with minutes, from which we compute resource allocation and cost
  226. // totals from measured rate data.
  227. podMap := map[podKey]*Pod{}
  228. // clusterStarts and clusterEnds record the earliest start and latest end
  229. // times, respectively, on a cluster-basis. These are used for unmounted
  230. // PVs and other "virtual" Allocations so that minutes are maximally
  231. // accurate during start-up or spin-down of a cluster
  232. clusterStart := map[string]time.Time{}
  233. clusterEnd := map[string]time.Time{}
  234. // If ingesting pod UID, we query kube_pod_container_status_running avg
  235. // by uid as well as the default values, and all podKeys/pods have their
  236. // names changed to "<pod_name> <pod_uid>". Because other metrics need
  237. // to generate keys to match pods but don't have UIDs, podUIDKeyMap
  238. // stores values of format:
  239. // default podKey : []{edited podkey 1, edited podkey 2}
  240. // This is because ingesting UID allows us to catch uncontrolled pods
  241. // with the same names. However, this will lead to a many-to-one metric
  242. // to podKey relation, so this map allows us to map the metric's
  243. // "<pod_name>" key to the edited "<pod_name> <pod_uid>" keys in podMap.
  244. ingestPodUID := env.IsIngestingPodUID()
  245. podUIDKeyMap := make(map[podKey][]podKey)
  246. if ingestPodUID {
  247. log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
  248. }
  249. // TODO:CLEANUP remove "max batch" idea and clusterStart/End
  250. cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
  251. // (2) Run and apply remaining queries
  252. // Query for the duration between start and end
  253. durStr := timeutil.DurationString(end.Sub(start))
  254. if durStr == "" {
  255. return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  256. }
  257. // Convert resolution duration to a query-ready string
  258. resStr := timeutil.DurationString(resolution)
  259. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  260. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
  261. resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
  262. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
  263. resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
  264. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
  265. resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
  266. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
  267. resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
  268. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
  269. resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
  270. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
  271. resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
  272. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
  273. resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
  274. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
  275. resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
  276. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
  277. resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
  278. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
  279. resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
  280. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
  281. resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  282. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
  283. resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  284. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
  285. resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  286. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
  287. resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
  288. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
  289. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
  290. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
  291. resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
  292. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
  293. resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
  294. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
  295. resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
  296. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
  297. resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  298. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
  299. resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
  300. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
  301. resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
  302. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
  303. resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
  304. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
  305. resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  306. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
  307. resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
  308. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
  309. resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  310. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
  311. resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
  312. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
  313. resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  314. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
  315. resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
  316. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
  317. resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
  318. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
  319. resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
  320. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
  321. resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
  322. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
  323. resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
  324. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
  325. resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
  326. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
  327. resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
  328. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
  329. resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
  330. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
  331. resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  332. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
  333. resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  334. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
  335. resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
  336. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
  337. resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
  338. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
  339. resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
  340. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  341. resCPURequests, _ := resChCPURequests.Await()
  342. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  343. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  344. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  345. resRAMRequests, _ := resChRAMRequests.Await()
  346. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  347. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  348. resGPUsRequested, _ := resChGPUsRequested.Await()
  349. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  350. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  351. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  352. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  353. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  354. resPVBytes, _ := resChPVBytes.Await()
  355. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  356. resPVCInfo, _ := resChPVCInfo.Await()
  357. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  358. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  359. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  360. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  361. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  362. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  363. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  364. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  365. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  366. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  367. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  368. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  369. resPodLabels, _ := resChPodLabels.Await()
  370. resPodAnnotations, _ := resChPodAnnotations.Await()
  371. resServiceLabels, _ := resChServiceLabels.Await()
  372. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  373. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  374. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  375. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  376. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  377. resJobLabels, _ := resChJobLabels.Await()
  378. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  379. resLBActiveMins, _ := resChLBActiveMins.Await()
  380. if ctx.HasErrors() {
  381. for _, err := range ctx.Errors() {
  382. log.Errorf("CostModel.ComputeAllocation: %s", err)
  383. }
  384. return allocSet, ctx.ErrorCollection()
  385. }
  386. // We choose to apply allocation before requests in the cases of RAM and
  387. // CPU so that we can assert that allocation should always be greater than
  388. // or equal to request.
  389. applyCPUCoresAllocated(podMap, resCPUCoresAllocated, podUIDKeyMap)
  390. applyCPUCoresRequested(podMap, resCPURequests, podUIDKeyMap)
  391. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg, podUIDKeyMap)
  392. applyCPUCoresUsedMax(podMap, resCPUUsageMax, podUIDKeyMap)
  393. applyRAMBytesAllocated(podMap, resRAMBytesAllocated, podUIDKeyMap)
  394. applyRAMBytesRequested(podMap, resRAMRequests, podUIDKeyMap)
  395. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg, podUIDKeyMap)
  396. applyRAMBytesUsedMax(podMap, resRAMUsageMax, podUIDKeyMap)
  397. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated, podUIDKeyMap)
  398. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes, podUIDKeyMap)
  399. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB, podUIDKeyMap)
  400. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB, podUIDKeyMap)
  401. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB, podUIDKeyMap)
  402. // In the case that a two pods with the same name had different containers,
  403. // we will double-count the containers. There is no way to associate each
  404. // container with the proper pod from the usage metrics above. This will
  405. // show up as a pod having two Allocations running for the whole pod runtime.
  406. // Other than that case, Allocations should be associated with pods by the
  407. // above functions.
  408. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  409. podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
  410. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  411. podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
  412. applyLabels(podMap, namespaceLabels, podLabels)
  413. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  414. serviceLabels := getServiceLabels(resServiceLabels)
  415. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  416. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  417. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  418. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  419. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
  420. podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
  421. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, podUIDKeyMap, ingestPodUID)
  422. applyControllersToPods(podMap, podDeploymentMap)
  423. applyControllersToPods(podMap, podStatefulSetMap)
  424. applyControllersToPods(podMap, podDaemonSetMap)
  425. applyControllersToPods(podMap, podJobMap)
  426. applyControllersToPods(podMap, podReplicaSetMap)
  427. // TODO breakdown network costs?
  428. // Build out a map of Nodes with resource costs, discounts, and node types
  429. // for converting resource allocation data to cumulative costs.
  430. nodeMap := map[nodeKey]*NodePricing{}
  431. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  432. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  433. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  434. applyNodeSpot(nodeMap, resNodeIsSpot)
  435. applyNodeDiscount(nodeMap, cm)
  436. // Build out the map of all PVs with class, size and cost-per-hour.
  437. // Note: this does not record time running, which we may want to
  438. // include later for increased PV precision. (As long as the PV has
  439. // a PVC, we get time running there, so this is only inaccurate
  440. // for short-lived, unmounted PVs.)
  441. pvMap := map[pvKey]*PV{}
  442. buildPVMap(pvMap, resPVCostPerGiBHour)
  443. applyPVBytes(pvMap, resPVBytes)
  444. // Build out the map of all PVCs with time running, bytes requested,
  445. // and connect to the correct PV from pvMap. (If no PV exists, that
  446. // is noted, but does not result in any allocation/cost.)
  447. pvcMap := map[pvcKey]*PVC{}
  448. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  449. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  450. // Build out the relationships of pods to their PVCs. This step
  451. // populates the PVC.Count field so that PVC allocation can be
  452. // split appropriately among each pod's container allocation.
  453. podPVCMap := map[podKey][]*PVC{}
  454. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
  455. // Because PVCs can be shared among pods, the respective PV cost
  456. // needs to be evenly distributed to those pods based on time
  457. // running, as well as the amount of time the PVC was shared.
  458. // Build a relation between every PVC to the pods that mount it
  459. // and a window representing the interval during which they
  460. // were associated.
  461. pvcPodIntervalMap := make(map[pvcKey]map[podKey]kubecost.Window)
  462. for _, pod := range podMap {
  463. for _, alloc := range pod.Allocations {
  464. cluster := alloc.Properties.Cluster
  465. namespace := alloc.Properties.Namespace
  466. pod := alloc.Properties.Pod
  467. thisPodKey := newPodKey(cluster, namespace, pod)
  468. if pvcs, ok := podPVCMap[thisPodKey]; ok {
  469. for _, pvc := range pvcs {
  470. // Determine the (start, end) of the relationship between the
  471. // given PVC and the associated Allocation so that a precise
  472. // number of hours can be used to compute cumulative cost.
  473. s, e := alloc.Start, alloc.End
  474. if pvc.Start.After(alloc.Start) {
  475. s = pvc.Start
  476. }
  477. if pvc.End.Before(alloc.End) {
  478. e = pvc.End
  479. }
  480. thisPVCKey := newPVCKey(cluster, namespace, pvc.Name)
  481. if pvcPodIntervalMap[thisPVCKey] == nil {
  482. pvcPodIntervalMap[thisPVCKey] = make(map[podKey]kubecost.Window)
  483. }
  484. pvcPodIntervalMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
  485. }
  486. }
  487. // We only need to look at one alloc per pod
  488. break
  489. }
  490. }
  491. // Build out a PV price coefficient for each pod with a PVC. Each
  492. // PVC-pod relation needs a coefficient which modifies the PV cost
  493. // such that PV costs can be shared between all pods using that PVC.
  494. sharedPVCCostCoefficientMap := make(map[pvcKey]map[podKey][]CoefficientComponent)
  495. for pvcKey, podIntervalMap := range pvcPodIntervalMap {
  496. // Get single-point intervals from alloc-PVC relation windows.
  497. intervals := getIntervalPointsFromWindows(podIntervalMap)
  498. // Determine coefficients for each PVC-pod relation.
  499. sharedPVCCostCoefficientMap[pvcKey] = getPVCCostCoefficients(intervals, podIntervalMap)
  500. }
  501. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  502. // cluster representing each cluster's unmounted PVs (if necessary).
  503. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  504. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  505. applyLoadBalancersToPods(lbMap, allocsByService)
  506. // (3) Build out AllocationSet from Pod map
  507. for _, pod := range podMap {
  508. for _, alloc := range pod.Allocations {
  509. cluster := alloc.Properties.Cluster
  510. nodeName := alloc.Properties.Node
  511. namespace := alloc.Properties.Namespace
  512. pod := alloc.Properties.Pod
  513. container := alloc.Properties.Container
  514. podKey := newPodKey(cluster, namespace, pod)
  515. nodeKey := newNodeKey(cluster, nodeName)
  516. node := cm.getNodePricing(nodeMap, nodeKey)
  517. alloc.Properties.ProviderID = node.ProviderID
  518. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  519. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  520. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  521. if pvcs, ok := podPVCMap[podKey]; ok {
  522. for _, pvc := range pvcs {
  523. pvcKey := newPVCKey(cluster, namespace, pvc.Name)
  524. s, e := alloc.Start, alloc.End
  525. if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
  526. s, e = *pvcInterval.Start(), *pvcInterval.End()
  527. } else {
  528. log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
  529. }
  530. minutes := e.Sub(s).Minutes()
  531. hrs := minutes / 60.0
  532. count := float64(pvc.Count)
  533. if pvc.Count < 1 {
  534. count = 1
  535. }
  536. gib := pvc.Bytes / 1024 / 1024 / 1024
  537. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  538. // Scale PV cost by PVC sharing coefficient.
  539. if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
  540. cost *= getCoefficientFromComponents(coeffComponents)
  541. } else {
  542. log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
  543. }
  544. // Apply the size and cost of the PV to the allocation, each
  545. // weighted by count (i.e. the number of containers in the pod)
  546. // record the amount of total PVBytes Hours attributable to a given PV
  547. if alloc.PVs == nil {
  548. alloc.PVs = kubecost.PVAllocations{}
  549. }
  550. pvKey := kubecost.PVKey{
  551. Cluster: pvc.Cluster,
  552. Name: pvc.Volume.Name,
  553. }
  554. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  555. ByteHours: pvc.Bytes * hrs / count,
  556. Cost: cost / count,
  557. }
  558. }
  559. }
  560. // Make sure that the name is correct (node may not be present at this
  561. // point due to it missing from queryMinutes) then insert.
  562. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  563. allocSet.Set(alloc)
  564. }
  565. }
  566. return allocSet, nil
  567. }
  568. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
  569. // Assumes that window is positive and closed
  570. start, end := *window.Start(), *window.End()
  571. // Convert resolution duration to a query-ready string
  572. resStr := timeutil.DurationString(resolution)
  573. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  574. // Query for (start, end) by (pod, namespace, cluster) over the given
  575. // window, using the given resolution, and if necessary in batches no
  576. // larger than the given maximum batch size. If working in batches, track
  577. // overall progress by starting with (window.start, window.start) and
  578. // querying in batches no larger than maxBatchSize from start-to-end,
  579. // folding each result set into podMap as the results come back.
  580. coverage := kubecost.NewWindow(&start, &start)
  581. numQuery := 1
  582. for coverage.End().Before(end) {
  583. // Determine the (start, end) of the current batch
  584. batchStart := *coverage.End()
  585. batchEnd := coverage.End().Add(maxBatchSize)
  586. if batchEnd.After(end) {
  587. batchEnd = end
  588. }
  589. var resPods []*prom.QueryResult
  590. var err error
  591. maxTries := 3
  592. numTries := 0
  593. for resPods == nil && numTries < maxTries {
  594. numTries++
  595. // Query for the duration between start and end
  596. durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
  597. if durStr == "" {
  598. // Negative duration, so set empty results and don't query
  599. resPods = []*prom.QueryResult{}
  600. err = nil
  601. break
  602. }
  603. // Submit and profile query
  604. var queryPods string
  605. // If ingesting UIDs, avg on them
  606. if ingestPodUID {
  607. queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterLabel(), durStr, resStr)
  608. } else {
  609. queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr)
  610. }
  611. queryProfile := time.Now()
  612. resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
  613. if err != nil {
  614. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  615. resPods = nil
  616. }
  617. }
  618. if err != nil {
  619. return err
  620. }
  621. // queryFmtPodsUID will return both UID-containing results, and non-UID-containing results,
  622. // so filter out the non-containing results so we don't duplicate pods. This is due to the
  623. // default setup of Kubecost having replicated kube_pod_container_status_running and
  624. // included KSM kube_pod_container_status_running. Querying w/ UID will return both.
  625. if ingestPodUID {
  626. var resPodsUID []*prom.QueryResult
  627. for _, res := range resPods {
  628. _, err := res.GetString("uid")
  629. if err == nil {
  630. resPodsUID = append(resPodsUID, res)
  631. }
  632. }
  633. if len(resPodsUID) > 0 {
  634. resPods = resPodsUID
  635. } else {
  636. log.DedupedWarningf(5, "CostModel.ComputeAllocation: UID ingestion enabled, but query did not return any results with UID")
  637. }
  638. }
  639. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
  640. coverage = coverage.ExpandEnd(batchEnd)
  641. numQuery++
  642. }
  643. return nil
  644. }
  645. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
  646. for _, res := range resPods {
  647. if len(res.Values) == 0 {
  648. log.Warnf("CostModel.ComputeAllocation: empty minutes result")
  649. continue
  650. }
  651. cluster, err := res.GetString(env.GetPromClusterLabel())
  652. if err != nil {
  653. cluster = env.GetClusterID()
  654. }
  655. labels, err := res.GetStrings("namespace", "pod")
  656. if err != nil {
  657. log.Warnf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  658. continue
  659. }
  660. namespace := labels["namespace"]
  661. pod := labels["pod"]
  662. key := newPodKey(cluster, namespace, pod)
  663. // If pod UIDs are being used to ID pods, append them to the pod name in
  664. // the podKey.
  665. if ingestPodUID {
  666. uid, err := res.GetString("uid")
  667. if err != nil {
  668. log.Warnf("CostModel.ComputeAllocation: UID ingestion enabled, but query result missing field: %s", err)
  669. } else {
  670. newKey := newPodKey(cluster, namespace, pod+" "+uid)
  671. podUIDKeyMap[key] = append(podUIDKeyMap[key], newKey)
  672. key = newKey
  673. }
  674. }
  675. // allocStart and allocEnd are the timestamps of the first and last
  676. // minutes the pod was running, respectively. We subtract one resolution
  677. // from allocStart because this point will actually represent the end
  678. // of the first minute. We don't subtract from allocEnd because it
  679. // already represents the end of the last minute.
  680. var allocStart, allocEnd time.Time
  681. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  682. for _, datum := range res.Values {
  683. t := time.Unix(int64(datum.Timestamp), 0)
  684. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  685. // Set the start timestamp to the earliest non-zero timestamp
  686. allocStart = t
  687. // Record adjustment coefficient, i.e. the portion of the start
  688. // timestamp to "ignore". That is, sometimes the value will be
  689. // 0.5, meaning that we should discount the time running by
  690. // half of the resolution the timestamp stands for.
  691. startAdjustmentCoeff = (1.0 - datum.Value)
  692. }
  693. if datum.Value > 0 && window.Contains(t) {
  694. // Set the end timestamp to the latest non-zero timestamp
  695. allocEnd = t
  696. // Record adjustment coefficient, i.e. the portion of the end
  697. // timestamp to "ignore". (See explanation above for start.)
  698. endAdjustmentCoeff = (1.0 - datum.Value)
  699. }
  700. }
  701. if allocStart.IsZero() || allocEnd.IsZero() {
  702. continue
  703. }
  704. // Adjust timestamps according to the resolution and the adjustment
  705. // coefficients, as described above. That is, count the start timestamp
  706. // from the beginning of the resolution, not the end. Then "reduce" the
  707. // start and end by the correct amount, in the case that the "running"
  708. // value of the first or last timestamp was not a full 1.0.
  709. allocStart = allocStart.Add(-resolution)
  710. // Note: the *100 and /100 are necessary because Duration is an int, so
  711. // 0.5, for instance, will be truncated, resulting in no adjustment.
  712. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  713. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  714. // Ensure that the allocStart is always within the window, adjusting
  715. // for the occasions where start falls 1m before the query window.
  716. // NOTE: window here will always be closed (so no need to nil check
  717. // "start").
  718. // TODO:CLEANUP revisit query methodology to figure out why this is
  719. // happening on occasion
  720. if allocStart.Before(*window.Start()) {
  721. allocStart = *window.Start()
  722. }
  723. // If there is only one point with a value <= 0.5 that the start and
  724. // end timestamps both share, then we will enter this case because at
  725. // least half of a resolution will be subtracted from both the start
  726. // and the end. If that is the case, then add back half of each side
  727. // so that the pod is said to run for half a resolution total.
  728. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  729. // end up with allocEnd == allocStart and each coeff == 0.5. In
  730. // that case, add 0.25m to each side, resulting in 0.5m duration.
  731. if !allocEnd.After(allocStart) {
  732. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  733. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  734. }
  735. // Ensure that the allocEnf is always within the window, adjusting
  736. // for the occasions where end falls 1m after the query window. This
  737. // has not ever happened, but is symmetrical with the start check
  738. // above.
  739. // NOTE: window here will always be closed (so no need to nil check
  740. // "end").
  741. // TODO:CLEANUP revisit query methodology to figure out why this is
  742. // happening on occasion
  743. if allocEnd.After(*window.End()) {
  744. allocEnd = *window.End()
  745. }
  746. // Set start if unset or this datum's start time is earlier than the
  747. // current earliest time.
  748. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  749. clusterStart[cluster] = allocStart
  750. }
  751. // Set end if unset or this datum's end time is later than the
  752. // current latest time.
  753. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  754. clusterEnd[cluster] = allocEnd
  755. }
  756. if pod, ok := podMap[key]; ok {
  757. // Pod has already been recorded, so update it accordingly
  758. if allocStart.Before(pod.Start) {
  759. pod.Start = allocStart
  760. }
  761. if allocEnd.After(pod.End) {
  762. pod.End = allocEnd
  763. }
  764. } else {
  765. // Pod has not been recorded yet, so insert it
  766. podMap[key] = &Pod{
  767. Window: window.Clone(),
  768. Start: allocStart,
  769. End: allocEnd,
  770. Key: key,
  771. Allocations: map[string]*kubecost.Allocation{},
  772. }
  773. }
  774. }
  775. }
  776. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  777. for _, res := range resCPUCoresAllocated {
  778. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  779. if err != nil {
  780. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  781. continue
  782. }
  783. container, err := res.GetString("container")
  784. if err != nil {
  785. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  786. continue
  787. }
  788. var pods []*Pod
  789. pod, ok := podMap[key]
  790. if !ok {
  791. if uidKeys, ok := podUIDKeyMap[key]; ok {
  792. for _, uidKey := range uidKeys {
  793. pod, ok = podMap[uidKey]
  794. if ok {
  795. pods = append(pods, pod)
  796. }
  797. }
  798. } else {
  799. continue
  800. }
  801. } else {
  802. pods = []*Pod{pod}
  803. }
  804. for _, pod := range pods {
  805. if _, ok := pod.Allocations[container]; !ok {
  806. pod.AppendContainer(container)
  807. }
  808. cpuCores := res.Values[0].Value
  809. if cpuCores > MAX_CPU_CAP {
  810. log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  811. cpuCores = 0.0
  812. }
  813. hours := pod.Allocations[container].Minutes() / 60.0
  814. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  815. node, err := res.GetString("node")
  816. if err != nil {
  817. log.Warnf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  818. continue
  819. }
  820. pod.Allocations[container].Properties.Node = node
  821. }
  822. }
  823. }
  824. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  825. for _, res := range resCPUCoresRequested {
  826. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  827. if err != nil {
  828. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  829. continue
  830. }
  831. container, err := res.GetString("container")
  832. if err != nil {
  833. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  834. continue
  835. }
  836. var pods []*Pod
  837. pod, ok := podMap[key]
  838. if !ok {
  839. if uidKeys, ok := podUIDKeyMap[key]; ok {
  840. for _, uidKey := range uidKeys {
  841. pod, ok = podMap[uidKey]
  842. if ok {
  843. pods = append(pods, pod)
  844. }
  845. }
  846. } else {
  847. continue
  848. }
  849. } else {
  850. pods = []*Pod{pod}
  851. }
  852. for _, pod := range pods {
  853. if _, ok := pod.Allocations[container]; !ok {
  854. pod.AppendContainer(container)
  855. }
  856. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  857. // If CPU allocation is less than requests, set CPUCoreHours to
  858. // request level.
  859. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  860. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  861. }
  862. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  863. log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  864. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  865. }
  866. node, err := res.GetString("node")
  867. if err != nil {
  868. log.Warnf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  869. continue
  870. }
  871. pod.Allocations[container].Properties.Node = node
  872. }
  873. }
  874. }
  875. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  876. for _, res := range resCPUCoresUsedAvg {
  877. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  878. if err != nil {
  879. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  880. continue
  881. }
  882. container, err := res.GetString("container")
  883. if container == "" || err != nil {
  884. container, err = res.GetString("container_name")
  885. if err != nil {
  886. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  887. continue
  888. }
  889. }
  890. var pods []*Pod
  891. pod, ok := podMap[key]
  892. if !ok {
  893. if uidKeys, ok := podUIDKeyMap[key]; ok {
  894. for _, uidKey := range uidKeys {
  895. pod, ok = podMap[uidKey]
  896. if ok {
  897. pods = append(pods, pod)
  898. }
  899. }
  900. } else {
  901. continue
  902. }
  903. } else {
  904. pods = []*Pod{pod}
  905. }
  906. for _, pod := range pods {
  907. if _, ok := pod.Allocations[container]; !ok {
  908. pod.AppendContainer(container)
  909. }
  910. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  911. if res.Values[0].Value > MAX_CPU_CAP {
  912. log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  913. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  914. }
  915. }
  916. }
  917. }
  918. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  919. for _, res := range resCPUCoresUsedMax {
  920. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  921. if err != nil {
  922. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  923. continue
  924. }
  925. container, err := res.GetString("container")
  926. if container == "" || err != nil {
  927. container, err = res.GetString("container_name")
  928. if err != nil {
  929. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  930. continue
  931. }
  932. }
  933. var pods []*Pod
  934. pod, ok := podMap[key]
  935. if !ok {
  936. if uidKeys, ok := podUIDKeyMap[key]; ok {
  937. for _, uidKey := range uidKeys {
  938. pod, ok = podMap[uidKey]
  939. if ok {
  940. pods = append(pods, pod)
  941. }
  942. }
  943. } else {
  944. continue
  945. }
  946. } else {
  947. pods = []*Pod{pod}
  948. }
  949. for _, pod := range pods {
  950. if _, ok := pod.Allocations[container]; !ok {
  951. pod.AppendContainer(container)
  952. }
  953. if pod.Allocations[container].RawAllocationOnly == nil {
  954. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  955. CPUCoreUsageMax: res.Values[0].Value,
  956. }
  957. } else {
  958. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  959. }
  960. }
  961. }
  962. }
  963. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  964. for _, res := range resRAMBytesAllocated {
  965. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  966. if err != nil {
  967. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  968. continue
  969. }
  970. container, err := res.GetString("container")
  971. if err != nil {
  972. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  973. continue
  974. }
  975. var pods []*Pod
  976. pod, ok := podMap[key]
  977. if !ok {
  978. if uidKeys, ok := podUIDKeyMap[key]; ok {
  979. for _, uidKey := range uidKeys {
  980. pod, ok = podMap[uidKey]
  981. if ok {
  982. pods = append(pods, pod)
  983. }
  984. }
  985. } else {
  986. continue
  987. }
  988. } else {
  989. pods = []*Pod{pod}
  990. }
  991. for _, pod := range pods {
  992. if _, ok := pod.Allocations[container]; !ok {
  993. pod.AppendContainer(container)
  994. }
  995. ramBytes := res.Values[0].Value
  996. hours := pod.Allocations[container].Minutes() / 60.0
  997. pod.Allocations[container].RAMByteHours = ramBytes * hours
  998. node, err := res.GetString("node")
  999. if err != nil {
  1000. log.Warnf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  1001. continue
  1002. }
  1003. pod.Allocations[container].Properties.Node = node
  1004. }
  1005. }
  1006. }
  1007. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1008. for _, res := range resRAMBytesRequested {
  1009. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1010. if err != nil {
  1011. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  1012. continue
  1013. }
  1014. container, err := res.GetString("container")
  1015. if err != nil {
  1016. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  1017. continue
  1018. }
  1019. var pods []*Pod
  1020. pod, ok := podMap[key]
  1021. if !ok {
  1022. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1023. for _, uidKey := range uidKeys {
  1024. pod, ok = podMap[uidKey]
  1025. if ok {
  1026. pods = append(pods, pod)
  1027. }
  1028. }
  1029. } else {
  1030. continue
  1031. }
  1032. } else {
  1033. pods = []*Pod{pod}
  1034. }
  1035. for _, pod := range pods {
  1036. if _, ok := pod.Allocations[container]; !ok {
  1037. pod.AppendContainer(container)
  1038. }
  1039. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  1040. // If RAM allocation is less than requests, set RAMByteHours to
  1041. // request level.
  1042. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  1043. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  1044. }
  1045. node, err := res.GetString("node")
  1046. if err != nil {
  1047. log.Warnf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  1048. continue
  1049. }
  1050. pod.Allocations[container].Properties.Node = node
  1051. }
  1052. }
  1053. }
  1054. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1055. for _, res := range resRAMBytesUsedAvg {
  1056. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1057. if err != nil {
  1058. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  1059. continue
  1060. }
  1061. container, err := res.GetString("container")
  1062. if container == "" || err != nil {
  1063. container, err = res.GetString("container_name")
  1064. if err != nil {
  1065. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  1066. continue
  1067. }
  1068. }
  1069. var pods []*Pod
  1070. pod, ok := podMap[key]
  1071. if !ok {
  1072. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1073. for _, uidKey := range uidKeys {
  1074. pod, ok = podMap[uidKey]
  1075. if ok {
  1076. pods = append(pods, pod)
  1077. }
  1078. }
  1079. } else {
  1080. continue
  1081. }
  1082. } else {
  1083. pods = []*Pod{pod}
  1084. }
  1085. for _, pod := range pods {
  1086. if _, ok := pod.Allocations[container]; !ok {
  1087. pod.AppendContainer(container)
  1088. }
  1089. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  1090. }
  1091. }
  1092. }
  1093. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1094. for _, res := range resRAMBytesUsedMax {
  1095. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1096. if err != nil {
  1097. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  1098. continue
  1099. }
  1100. container, err := res.GetString("container")
  1101. if container == "" || err != nil {
  1102. container, err = res.GetString("container_name")
  1103. if err != nil {
  1104. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  1105. continue
  1106. }
  1107. }
  1108. var pods []*Pod
  1109. pod, ok := podMap[key]
  1110. if !ok {
  1111. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1112. for _, uidKey := range uidKeys {
  1113. pod, ok = podMap[uidKey]
  1114. if ok {
  1115. pods = append(pods, pod)
  1116. }
  1117. }
  1118. } else {
  1119. continue
  1120. }
  1121. } else {
  1122. pods = []*Pod{pod}
  1123. }
  1124. for _, pod := range pods {
  1125. if _, ok := pod.Allocations[container]; !ok {
  1126. pod.AppendContainer(container)
  1127. }
  1128. if pod.Allocations[container].RawAllocationOnly == nil {
  1129. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  1130. RAMBytesUsageMax: res.Values[0].Value,
  1131. }
  1132. } else {
  1133. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  1134. }
  1135. }
  1136. }
  1137. }
  1138. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1139. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  1140. resGPUsRequested = resGPUsAllocated
  1141. }
  1142. for _, res := range resGPUsRequested {
  1143. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1144. if err != nil {
  1145. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  1146. continue
  1147. }
  1148. container, err := res.GetString("container")
  1149. if err != nil {
  1150. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  1151. continue
  1152. }
  1153. var pods []*Pod
  1154. pod, ok := podMap[key]
  1155. if !ok {
  1156. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1157. for _, uidKey := range uidKeys {
  1158. pod, ok = podMap[uidKey]
  1159. if ok {
  1160. pods = append(pods, pod)
  1161. }
  1162. }
  1163. } else {
  1164. continue
  1165. }
  1166. } else {
  1167. pods = []*Pod{pod}
  1168. }
  1169. for _, pod := range pods {
  1170. if _, ok := pod.Allocations[container]; !ok {
  1171. pod.AppendContainer(container)
  1172. }
  1173. hrs := pod.Allocations[container].Minutes() / 60.0
  1174. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  1175. }
  1176. }
  1177. }
  1178. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1179. for _, res := range resNetworkTransferBytes {
  1180. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1181. if err != nil {
  1182. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  1183. continue
  1184. }
  1185. var pods []*Pod
  1186. pod, ok := podMap[podKey]
  1187. if !ok {
  1188. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1189. for _, uidKey := range uidKeys {
  1190. pod, ok = podMap[uidKey]
  1191. if ok {
  1192. pods = append(pods, pod)
  1193. }
  1194. }
  1195. } else {
  1196. continue
  1197. }
  1198. } else {
  1199. pods = []*Pod{pod}
  1200. }
  1201. for _, pod := range pods {
  1202. for _, alloc := range pod.Allocations {
  1203. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations)) / float64(len(pods))
  1204. }
  1205. }
  1206. }
  1207. for _, res := range resNetworkReceiveBytes {
  1208. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1209. if err != nil {
  1210. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  1211. continue
  1212. }
  1213. var pods []*Pod
  1214. pod, ok := podMap[podKey]
  1215. if !ok {
  1216. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1217. for _, uidKey := range uidKeys {
  1218. pod, ok = podMap[uidKey]
  1219. if ok {
  1220. pods = append(pods, pod)
  1221. }
  1222. }
  1223. } else {
  1224. continue
  1225. }
  1226. } else {
  1227. pods = []*Pod{pod}
  1228. }
  1229. for _, pod := range pods {
  1230. for _, alloc := range pod.Allocations {
  1231. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations)) / float64(len(pods))
  1232. }
  1233. }
  1234. }
  1235. }
  1236. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  1237. costPerGiBByCluster := map[string]float64{}
  1238. for _, res := range resNetworkCostPerGiB {
  1239. cluster, err := res.GetString(env.GetPromClusterLabel())
  1240. if err != nil {
  1241. cluster = env.GetClusterID()
  1242. }
  1243. costPerGiBByCluster[cluster] = res.Values[0].Value
  1244. }
  1245. for _, res := range resNetworkGiB {
  1246. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1247. if err != nil {
  1248. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  1249. continue
  1250. }
  1251. var pods []*Pod
  1252. pod, ok := podMap[podKey]
  1253. if !ok {
  1254. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  1255. for _, uidKey := range uidKeys {
  1256. pod, ok = podMap[uidKey]
  1257. if ok {
  1258. pods = append(pods, pod)
  1259. }
  1260. }
  1261. } else {
  1262. continue
  1263. }
  1264. } else {
  1265. pods = []*Pod{pod}
  1266. }
  1267. for _, pod := range pods {
  1268. for _, alloc := range pod.Allocations {
  1269. gib := res.Values[0].Value / float64(len(pod.Allocations))
  1270. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  1271. alloc.NetworkCost = gib * costPerGiB / float64(len(pods))
  1272. }
  1273. }
  1274. }
  1275. }
  1276. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  1277. namespaceLabels := map[namespaceKey]map[string]string{}
  1278. for _, res := range resNamespaceLabels {
  1279. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  1280. if err != nil {
  1281. continue
  1282. }
  1283. if _, ok := namespaceLabels[nsKey]; !ok {
  1284. namespaceLabels[nsKey] = map[string]string{}
  1285. }
  1286. for k, l := range res.GetLabels() {
  1287. namespaceLabels[nsKey][k] = l
  1288. }
  1289. }
  1290. return namespaceLabels
  1291. }
  1292. func resToPodLabels(resPodLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  1293. podLabels := map[podKey]map[string]string{}
  1294. for _, res := range resPodLabels {
  1295. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1296. if err != nil {
  1297. continue
  1298. }
  1299. var keys []podKey
  1300. if ingestPodUID {
  1301. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1302. keys = append(keys, uidKeys...)
  1303. }
  1304. } else {
  1305. keys = []podKey{key}
  1306. }
  1307. for _, key := range keys {
  1308. if _, ok := podLabels[key]; !ok {
  1309. podLabels[key] = map[string]string{}
  1310. }
  1311. for k, l := range res.GetLabels() {
  1312. podLabels[key][k] = l
  1313. }
  1314. }
  1315. }
  1316. return podLabels
  1317. }
  1318. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  1319. namespaceAnnotations := map[string]map[string]string{}
  1320. for _, res := range resNamespaceAnnotations {
  1321. namespace, err := res.GetString("namespace")
  1322. if err != nil {
  1323. continue
  1324. }
  1325. if _, ok := namespaceAnnotations[namespace]; !ok {
  1326. namespaceAnnotations[namespace] = map[string]string{}
  1327. }
  1328. for k, l := range res.GetAnnotations() {
  1329. namespaceAnnotations[namespace][k] = l
  1330. }
  1331. }
  1332. return namespaceAnnotations
  1333. }
  1334. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  1335. podAnnotations := map[podKey]map[string]string{}
  1336. for _, res := range resPodAnnotations {
  1337. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  1338. if err != nil {
  1339. continue
  1340. }
  1341. var keys []podKey
  1342. if ingestPodUID {
  1343. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1344. keys = append(keys, uidKeys...)
  1345. }
  1346. } else {
  1347. keys = []podKey{key}
  1348. }
  1349. for _, key := range keys {
  1350. if _, ok := podAnnotations[key]; !ok {
  1351. podAnnotations[key] = map[string]string{}
  1352. }
  1353. for k, l := range res.GetAnnotations() {
  1354. podAnnotations[key][k] = l
  1355. }
  1356. }
  1357. }
  1358. return podAnnotations
  1359. }
  1360. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  1361. for podKey, pod := range podMap {
  1362. for _, alloc := range pod.Allocations {
  1363. allocLabels := alloc.Properties.Labels
  1364. if allocLabels == nil {
  1365. allocLabels = make(map[string]string)
  1366. }
  1367. // Apply namespace labels first, then pod labels so that pod labels
  1368. // overwrite namespace labels.
  1369. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  1370. if labels, ok := namespaceLabels[nsKey]; ok {
  1371. for k, v := range labels {
  1372. allocLabels[k] = v
  1373. }
  1374. }
  1375. if labels, ok := podLabels[podKey]; ok {
  1376. for k, v := range labels {
  1377. allocLabels[k] = v
  1378. }
  1379. }
  1380. alloc.Properties.Labels = allocLabels
  1381. }
  1382. }
  1383. }
  1384. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  1385. for key, pod := range podMap {
  1386. for _, alloc := range pod.Allocations {
  1387. allocAnnotations := alloc.Properties.Annotations
  1388. if allocAnnotations == nil {
  1389. allocAnnotations = make(map[string]string)
  1390. }
  1391. // Apply namespace annotations first, then pod annotations so that
  1392. // pod labels overwrite namespace labels.
  1393. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  1394. for k, v := range labels {
  1395. allocAnnotations[k] = v
  1396. }
  1397. }
  1398. if labels, ok := podAnnotations[key]; ok {
  1399. for k, v := range labels {
  1400. allocAnnotations[k] = v
  1401. }
  1402. }
  1403. alloc.Properties.Annotations = allocAnnotations
  1404. }
  1405. }
  1406. }
  1407. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  1408. serviceLabels := map[serviceKey]map[string]string{}
  1409. for _, res := range resServiceLabels {
  1410. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  1411. if err != nil {
  1412. continue
  1413. }
  1414. if _, ok := serviceLabels[serviceKey]; !ok {
  1415. serviceLabels[serviceKey] = map[string]string{}
  1416. }
  1417. for k, l := range res.GetLabels() {
  1418. serviceLabels[serviceKey][k] = l
  1419. }
  1420. }
  1421. // Prune duplicate services. That is, if the same service exists with
  1422. // hyphens instead of underscores, keep the one that uses hyphens.
  1423. for key := range serviceLabels {
  1424. if strings.Contains(key.Service, "_") {
  1425. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1426. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1427. if _, ok := serviceLabels[duplicateKey]; ok {
  1428. delete(serviceLabels, key)
  1429. }
  1430. }
  1431. }
  1432. return serviceLabels
  1433. }
  1434. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1435. deploymentLabels := map[controllerKey]map[string]string{}
  1436. for _, res := range resDeploymentLabels {
  1437. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1438. if err != nil {
  1439. continue
  1440. }
  1441. if _, ok := deploymentLabels[controllerKey]; !ok {
  1442. deploymentLabels[controllerKey] = map[string]string{}
  1443. }
  1444. for k, l := range res.GetLabels() {
  1445. deploymentLabels[controllerKey][k] = l
  1446. }
  1447. }
  1448. // Prune duplicate deployments. That is, if the same deployment exists with
  1449. // hyphens instead of underscores, keep the one that uses hyphens.
  1450. for key := range deploymentLabels {
  1451. if strings.Contains(key.Controller, "_") {
  1452. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1453. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1454. if _, ok := deploymentLabels[duplicateKey]; ok {
  1455. delete(deploymentLabels, key)
  1456. }
  1457. }
  1458. }
  1459. return deploymentLabels
  1460. }
  1461. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1462. statefulSetLabels := map[controllerKey]map[string]string{}
  1463. for _, res := range resStatefulSetLabels {
  1464. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1465. if err != nil {
  1466. continue
  1467. }
  1468. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1469. statefulSetLabels[controllerKey] = map[string]string{}
  1470. }
  1471. for k, l := range res.GetLabels() {
  1472. statefulSetLabels[controllerKey][k] = l
  1473. }
  1474. }
  1475. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1476. // with hyphens instead of underscores, keep the one that uses hyphens.
  1477. for key := range statefulSetLabels {
  1478. if strings.Contains(key.Controller, "_") {
  1479. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1480. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1481. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1482. delete(statefulSetLabels, key)
  1483. }
  1484. }
  1485. }
  1486. return statefulSetLabels
  1487. }
  1488. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1489. podControllerMap := map[podKey]controllerKey{}
  1490. // For each controller, turn the labels into a selector and attempt to
  1491. // match it with each set of pod labels. A match indicates that the pod
  1492. // belongs to the controller.
  1493. for cKey, cLabels := range controllerLabels {
  1494. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1495. for pKey, pLabels := range podLabels {
  1496. // If the pod is in a different cluster or namespace, there is
  1497. // no need to compare the labels.
  1498. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1499. continue
  1500. }
  1501. podLabelSet := labels.Set(pLabels)
  1502. if selector.Matches(podLabelSet) {
  1503. if _, ok := podControllerMap[pKey]; ok {
  1504. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1505. }
  1506. podControllerMap[pKey] = cKey
  1507. }
  1508. }
  1509. }
  1510. return podControllerMap
  1511. }
  1512. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1513. daemonSetLabels := map[podKey]controllerKey{}
  1514. for _, res := range resDaemonSetLabels {
  1515. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1516. if err != nil {
  1517. continue
  1518. }
  1519. pod, err := res.GetString("pod")
  1520. if err != nil {
  1521. log.Warnf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1522. }
  1523. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1524. var keys []podKey
  1525. if ingestPodUID {
  1526. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1527. keys = append(keys, uidKeys...)
  1528. }
  1529. } else {
  1530. keys = []podKey{key}
  1531. }
  1532. for _, key := range keys {
  1533. daemonSetLabels[key] = controllerKey
  1534. }
  1535. }
  1536. return daemonSetLabels
  1537. }
  1538. func resToPodJobMap(resJobLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1539. jobLabels := map[podKey]controllerKey{}
  1540. for _, res := range resJobLabels {
  1541. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1542. if err != nil {
  1543. continue
  1544. }
  1545. // Convert the name of Jobs generated by CronJobs to the name of the
  1546. // CronJob by stripping the timestamp off the end.
  1547. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1548. if match != nil {
  1549. controllerKey.Controller = match[1]
  1550. }
  1551. pod, err := res.GetString("pod")
  1552. if err != nil {
  1553. log.Warnf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1554. }
  1555. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1556. var keys []podKey
  1557. if ingestPodUID {
  1558. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1559. keys = append(keys, uidKeys...)
  1560. }
  1561. } else {
  1562. keys = []podKey{key}
  1563. }
  1564. for _, key := range keys {
  1565. jobLabels[key] = controllerKey
  1566. }
  1567. }
  1568. return jobLabels
  1569. }
  1570. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  1571. // Build out set of ReplicaSets that have no owners, themselves, such that
  1572. // the ReplicaSet should be used as the owner of the Pods it controls.
  1573. // (This should exclude, for example, ReplicaSets that are controlled by
  1574. // Deployments, in which case the Deployment should be the Pod's owner.)
  1575. replicaSets := map[controllerKey]struct{}{}
  1576. for _, res := range resReplicaSetsWithoutOwners {
  1577. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1578. if err != nil {
  1579. continue
  1580. }
  1581. replicaSets[controllerKey] = struct{}{}
  1582. }
  1583. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1584. // to not appear in the set of uncontrolled ReplicaSets above.
  1585. podToReplicaSet := map[podKey]controllerKey{}
  1586. for _, res := range resPodsWithReplicaSetOwner {
  1587. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1588. if err != nil {
  1589. continue
  1590. }
  1591. if _, ok := replicaSets[controllerKey]; !ok {
  1592. continue
  1593. }
  1594. pod, err := res.GetString("pod")
  1595. if err != nil {
  1596. log.Warnf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1597. }
  1598. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1599. var keys []podKey
  1600. if ingestPodUID {
  1601. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1602. keys = append(keys, uidKeys...)
  1603. }
  1604. } else {
  1605. keys = []podKey{key}
  1606. }
  1607. for _, key := range keys {
  1608. podToReplicaSet[key] = controllerKey
  1609. }
  1610. }
  1611. return podToReplicaSet
  1612. }
  1613. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1614. podServicesMap := map[podKey][]serviceKey{}
  1615. // For each service, turn the labels into a selector and attempt to
  1616. // match it with each set of pod labels. A match indicates that the pod
  1617. // belongs to the service.
  1618. for sKey, sLabels := range serviceLabels {
  1619. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1620. for pKey, pLabels := range podLabels {
  1621. // If the pod is in a different cluster or namespace, there is
  1622. // no need to compare the labels.
  1623. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1624. continue
  1625. }
  1626. podLabelSet := labels.Set(pLabels)
  1627. if selector.Matches(podLabelSet) {
  1628. if _, ok := podServicesMap[pKey]; !ok {
  1629. podServicesMap[pKey] = []serviceKey{}
  1630. }
  1631. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1632. }
  1633. }
  1634. }
  1635. // For each allocation in each pod, attempt to find and apply the list of
  1636. // services associated with the allocation's pod.
  1637. for key, pod := range podMap {
  1638. for _, alloc := range pod.Allocations {
  1639. if sKeys, ok := podServicesMap[key]; ok {
  1640. services := []string{}
  1641. for _, sKey := range sKeys {
  1642. services = append(services, sKey.Service)
  1643. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1644. }
  1645. alloc.Properties.Services = services
  1646. }
  1647. }
  1648. }
  1649. }
  1650. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1651. for key, pod := range podMap {
  1652. for _, alloc := range pod.Allocations {
  1653. if controllerKey, ok := podControllerMap[key]; ok {
  1654. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1655. alloc.Properties.Controller = controllerKey.Controller
  1656. }
  1657. }
  1658. }
  1659. }
  1660. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1661. for _, res := range resNodeCostPerCPUHr {
  1662. cluster, err := res.GetString(env.GetPromClusterLabel())
  1663. if err != nil {
  1664. cluster = env.GetClusterID()
  1665. }
  1666. node, err := res.GetString("node")
  1667. if err != nil {
  1668. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1669. continue
  1670. }
  1671. instanceType, err := res.GetString("instance_type")
  1672. if err != nil {
  1673. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1674. continue
  1675. }
  1676. providerID, err := res.GetString("provider_id")
  1677. if err != nil {
  1678. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1679. continue
  1680. }
  1681. key := newNodeKey(cluster, node)
  1682. if _, ok := nodeMap[key]; !ok {
  1683. nodeMap[key] = &NodePricing{
  1684. Name: node,
  1685. NodeType: instanceType,
  1686. ProviderID: cloud.ParseID(providerID),
  1687. }
  1688. }
  1689. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1690. }
  1691. }
  1692. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1693. for _, res := range resNodeCostPerRAMGiBHr {
  1694. cluster, err := res.GetString(env.GetPromClusterLabel())
  1695. if err != nil {
  1696. cluster = env.GetClusterID()
  1697. }
  1698. node, err := res.GetString("node")
  1699. if err != nil {
  1700. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1701. continue
  1702. }
  1703. instanceType, err := res.GetString("instance_type")
  1704. if err != nil {
  1705. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1706. continue
  1707. }
  1708. providerID, err := res.GetString("provider_id")
  1709. if err != nil {
  1710. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1711. continue
  1712. }
  1713. key := newNodeKey(cluster, node)
  1714. if _, ok := nodeMap[key]; !ok {
  1715. nodeMap[key] = &NodePricing{
  1716. Name: node,
  1717. NodeType: instanceType,
  1718. ProviderID: cloud.ParseID(providerID),
  1719. }
  1720. }
  1721. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1722. }
  1723. }
  1724. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1725. for _, res := range resNodeCostPerGPUHr {
  1726. cluster, err := res.GetString(env.GetPromClusterLabel())
  1727. if err != nil {
  1728. cluster = env.GetClusterID()
  1729. }
  1730. node, err := res.GetString("node")
  1731. if err != nil {
  1732. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1733. continue
  1734. }
  1735. instanceType, err := res.GetString("instance_type")
  1736. if err != nil {
  1737. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1738. continue
  1739. }
  1740. providerID, err := res.GetString("provider_id")
  1741. if err != nil {
  1742. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1743. continue
  1744. }
  1745. key := newNodeKey(cluster, node)
  1746. if _, ok := nodeMap[key]; !ok {
  1747. nodeMap[key] = &NodePricing{
  1748. Name: node,
  1749. NodeType: instanceType,
  1750. ProviderID: cloud.ParseID(providerID),
  1751. }
  1752. }
  1753. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1754. }
  1755. }
  1756. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1757. for _, res := range resNodeIsSpot {
  1758. cluster, err := res.GetString(env.GetPromClusterLabel())
  1759. if err != nil {
  1760. cluster = env.GetClusterID()
  1761. }
  1762. node, err := res.GetString("node")
  1763. if err != nil {
  1764. log.Warnf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1765. continue
  1766. }
  1767. key := newNodeKey(cluster, node)
  1768. if _, ok := nodeMap[key]; !ok {
  1769. log.Warnf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1770. continue
  1771. }
  1772. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1773. }
  1774. }
  1775. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1776. if cm == nil {
  1777. return
  1778. }
  1779. c, err := cm.Provider.GetConfig()
  1780. if err != nil {
  1781. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1782. return
  1783. }
  1784. discount, err := ParsePercentString(c.Discount)
  1785. if err != nil {
  1786. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1787. return
  1788. }
  1789. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1790. if err != nil {
  1791. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1792. return
  1793. }
  1794. for _, node := range nodeMap {
  1795. // TODO GKE Reserved Instances into account
  1796. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1797. node.CostPerCPUHr *= (1.0 - node.Discount)
  1798. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1799. }
  1800. }
  1801. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1802. for _, res := range resPVCostPerGiBHour {
  1803. cluster, err := res.GetString(env.GetPromClusterLabel())
  1804. if err != nil {
  1805. cluster = env.GetClusterID()
  1806. }
  1807. name, err := res.GetString("volumename")
  1808. if err != nil {
  1809. log.Warnf("CostModel.ComputeAllocation: PV cost without volumename")
  1810. continue
  1811. }
  1812. key := newPVKey(cluster, name)
  1813. pvMap[key] = &PV{
  1814. Cluster: cluster,
  1815. Name: name,
  1816. CostPerGiBHour: res.Values[0].Value,
  1817. }
  1818. }
  1819. }
  1820. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1821. for _, res := range resPVBytes {
  1822. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1823. if err != nil {
  1824. log.Warnf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1825. continue
  1826. }
  1827. if _, ok := pvMap[key]; !ok {
  1828. log.Warnf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1829. continue
  1830. }
  1831. pvMap[key].Bytes = res.Values[0].Value
  1832. }
  1833. }
  1834. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1835. for _, res := range resPVCInfo {
  1836. cluster, err := res.GetString(env.GetPromClusterLabel())
  1837. if err != nil {
  1838. cluster = env.GetClusterID()
  1839. }
  1840. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1841. if err != nil {
  1842. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1843. continue
  1844. }
  1845. namespace := values["namespace"]
  1846. name := values["persistentvolumeclaim"]
  1847. volume := values["volumename"]
  1848. storageClass := values["storageclass"]
  1849. pvKey := newPVKey(cluster, volume)
  1850. pvcKey := newPVCKey(cluster, namespace, name)
  1851. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1852. // the PVC was running, respectively. We subtract 1m from pvcStart
  1853. // because this point will actually represent the end of the first
  1854. // minute. We don't subtract from pvcEnd because it already represents
  1855. // the end of the last minute.
  1856. var pvcStart, pvcEnd time.Time
  1857. for _, datum := range res.Values {
  1858. t := time.Unix(int64(datum.Timestamp), 0)
  1859. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1860. pvcStart = t
  1861. }
  1862. if datum.Value > 0 && window.Contains(t) {
  1863. pvcEnd = t
  1864. }
  1865. }
  1866. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1867. log.Warnf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1868. }
  1869. pvcStart = pvcStart.Add(-time.Minute)
  1870. if _, ok := pvMap[pvKey]; !ok {
  1871. continue
  1872. }
  1873. pvMap[pvKey].StorageClass = storageClass
  1874. if _, ok := pvcMap[pvcKey]; !ok {
  1875. pvcMap[pvcKey] = &PVC{}
  1876. }
  1877. pvcMap[pvcKey].Name = name
  1878. pvcMap[pvcKey].Namespace = namespace
  1879. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1880. pvcMap[pvcKey].Start = pvcStart
  1881. pvcMap[pvcKey].End = pvcEnd
  1882. }
  1883. }
  1884. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1885. for _, res := range resPVCBytesRequested {
  1886. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1887. if err != nil {
  1888. continue
  1889. }
  1890. if _, ok := pvcMap[key]; !ok {
  1891. continue
  1892. }
  1893. pvcMap[key].Bytes = res.Values[0].Value
  1894. }
  1895. }
  1896. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) {
  1897. for _, res := range resPodPVCAllocation {
  1898. cluster, err := res.GetString(env.GetPromClusterLabel())
  1899. if err != nil {
  1900. cluster = env.GetClusterID()
  1901. }
  1902. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1903. if err != nil {
  1904. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1905. continue
  1906. }
  1907. namespace := values["namespace"]
  1908. pod := values["pod"]
  1909. name := values["persistentvolumeclaim"]
  1910. volume := values["persistentvolume"]
  1911. key := newPodKey(cluster, namespace, pod)
  1912. pvKey := newPVKey(cluster, volume)
  1913. pvcKey := newPVCKey(cluster, namespace, name)
  1914. var keys []podKey
  1915. if ingestPodUID {
  1916. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1917. keys = append(keys, uidKeys...)
  1918. }
  1919. } else {
  1920. keys = []podKey{key}
  1921. }
  1922. for _, key := range keys {
  1923. if _, ok := pvMap[pvKey]; !ok {
  1924. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1925. continue
  1926. }
  1927. if _, ok := podPVCMap[key]; !ok {
  1928. podPVCMap[key] = []*PVC{}
  1929. }
  1930. pvc, ok := pvcMap[pvcKey]
  1931. if !ok {
  1932. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1933. continue
  1934. }
  1935. count := 1
  1936. if pod, ok := podMap[key]; ok && len(pod.Allocations) > 0 {
  1937. count = len(pod.Allocations)
  1938. } else {
  1939. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, key)
  1940. }
  1941. pvc.Count = count
  1942. pvc.Mounted = true
  1943. podPVCMap[key] = append(podPVCMap[key], pvc)
  1944. }
  1945. }
  1946. }
  1947. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1948. unmountedPVBytes := map[string]float64{}
  1949. unmountedPVCost := map[string]float64{}
  1950. for _, pv := range pvMap {
  1951. mounted := false
  1952. for _, pvc := range pvcMap {
  1953. if pvc.Volume == nil {
  1954. continue
  1955. }
  1956. if pvc.Volume == pv {
  1957. mounted = true
  1958. break
  1959. }
  1960. }
  1961. if !mounted {
  1962. gib := pv.Bytes / 1024 / 1024 / 1024
  1963. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1964. cost := pv.CostPerGiBHour * gib * hrs
  1965. unmountedPVCost[pv.Cluster] += cost
  1966. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1967. }
  1968. }
  1969. for cluster, amount := range unmountedPVCost {
  1970. container := kubecost.UnmountedSuffix
  1971. pod := kubecost.UnmountedSuffix
  1972. namespace := kubecost.UnmountedSuffix
  1973. node := ""
  1974. key := newPodKey(cluster, namespace, pod)
  1975. podMap[key] = &Pod{
  1976. Window: window.Clone(),
  1977. Start: *window.Start(),
  1978. End: *window.End(),
  1979. Key: key,
  1980. Allocations: map[string]*kubecost.Allocation{},
  1981. }
  1982. podMap[key].AppendContainer(container)
  1983. podMap[key].Allocations[container].Properties.Cluster = cluster
  1984. podMap[key].Allocations[container].Properties.Node = node
  1985. podMap[key].Allocations[container].Properties.Namespace = namespace
  1986. podMap[key].Allocations[container].Properties.Pod = pod
  1987. podMap[key].Allocations[container].Properties.Container = container
  1988. pvKey := kubecost.PVKey{
  1989. Cluster: cluster,
  1990. Name: kubecost.UnmountedSuffix,
  1991. }
  1992. unmountedPVs := kubecost.PVAllocations{
  1993. pvKey: {
  1994. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1995. Cost: amount,
  1996. },
  1997. }
  1998. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1999. }
  2000. }
  2001. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  2002. unmountedPVCBytes := map[namespaceKey]float64{}
  2003. unmountedPVCCost := map[namespaceKey]float64{}
  2004. for _, pvc := range pvcMap {
  2005. if !pvc.Mounted && pvc.Volume != nil {
  2006. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  2007. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  2008. hrs := pvc.Minutes() / 60.0
  2009. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  2010. unmountedPVCCost[key] += cost
  2011. unmountedPVCBytes[key] += pvc.Volume.Bytes
  2012. }
  2013. }
  2014. for key, amount := range unmountedPVCCost {
  2015. container := kubecost.UnmountedSuffix
  2016. pod := kubecost.UnmountedSuffix
  2017. namespace := key.Namespace
  2018. node := ""
  2019. cluster := key.Cluster
  2020. podKey := newPodKey(cluster, namespace, pod)
  2021. podMap[podKey] = &Pod{
  2022. Window: window.Clone(),
  2023. Start: *window.Start(),
  2024. End: *window.End(),
  2025. Key: podKey,
  2026. Allocations: map[string]*kubecost.Allocation{},
  2027. }
  2028. podMap[podKey].AppendContainer(container)
  2029. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  2030. podMap[podKey].Allocations[container].Properties.Node = node
  2031. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  2032. podMap[podKey].Allocations[container].Properties.Pod = pod
  2033. podMap[podKey].Allocations[container].Properties.Container = container
  2034. pvKey := kubecost.PVKey{
  2035. Cluster: cluster,
  2036. Name: kubecost.UnmountedSuffix,
  2037. }
  2038. unmountedPVs := kubecost.PVAllocations{
  2039. pvKey: {
  2040. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  2041. Cost: amount,
  2042. },
  2043. }
  2044. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  2045. }
  2046. }
  2047. // LB describes the start and end time of a Load Balancer along with cost
  2048. type LB struct {
  2049. TotalCost float64
  2050. Start time.Time
  2051. End time.Time
  2052. }
  2053. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  2054. lbMap := make(map[serviceKey]*LB)
  2055. lbHourlyCosts := make(map[serviceKey]float64)
  2056. for _, res := range resLBCost {
  2057. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  2058. if err != nil {
  2059. continue
  2060. }
  2061. lbHourlyCosts[serviceKey] = res.Values[0].Value
  2062. }
  2063. for _, res := range resLBActiveMins {
  2064. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  2065. if err != nil || len(res.Values) == 0 {
  2066. continue
  2067. }
  2068. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  2069. log.Warnf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  2070. continue
  2071. }
  2072. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  2073. // subtract resolution from start time to cover full time period
  2074. s = s.Add(-resolution)
  2075. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  2076. hours := e.Sub(s).Hours()
  2077. lbMap[serviceKey] = &LB{
  2078. TotalCost: lbHourlyCosts[serviceKey] * hours,
  2079. Start: s,
  2080. End: e,
  2081. }
  2082. }
  2083. return lbMap
  2084. }
  2085. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  2086. for sKey, lb := range lbMap {
  2087. totalHours := 0.0
  2088. allocHours := make(map[*kubecost.Allocation]float64)
  2089. // Add portion of load balancing cost to each allocation
  2090. // proportional to the total number of hours allocations used the load balancer
  2091. for _, alloc := range allocsByService[sKey] {
  2092. // Determine the (start, end) of the relationship between the
  2093. // given LB and the associated Allocation so that a precise
  2094. // number of hours can be used to compute cumulative cost.
  2095. s, e := alloc.Start, alloc.End
  2096. if lb.Start.After(alloc.Start) {
  2097. s = lb.Start
  2098. }
  2099. if lb.End.Before(alloc.End) {
  2100. e = lb.End
  2101. }
  2102. hours := e.Sub(s).Hours()
  2103. // A negative number of hours signifies no overlap between the windows
  2104. if hours > 0 {
  2105. totalHours += hours
  2106. allocHours[alloc] = hours
  2107. }
  2108. }
  2109. // Distribute cost of service once total hours is calculated
  2110. for alloc, hours := range allocHours {
  2111. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  2112. }
  2113. }
  2114. }
  2115. // getNodePricing determines node pricing, given a key and a mapping from keys
  2116. // to their NodePricing instances, as well as the custom pricing configuration
  2117. // inherent to the CostModel instance. If custom pricing is set, use that. If
  2118. // not, use the pricing defined by the given key. If that doesn't exist, fall
  2119. // back on custom pricing as a default.
  2120. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  2121. // Find the relevant NodePricing, if it exists. If not, substitute the
  2122. // custom NodePricing as a default.
  2123. node, ok := nodeMap[nodeKey]
  2124. if !ok || node == nil {
  2125. if nodeKey.Node != "" {
  2126. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  2127. }
  2128. return cm.getCustomNodePricing(false)
  2129. }
  2130. // If custom pricing is enabled and can be retrieved, override detected
  2131. // node pricing with the custom values.
  2132. customPricingConfig, err := cm.Provider.GetConfig()
  2133. if err != nil {
  2134. log.Warnf("CostModel: failed to load custom pricing: %s", err)
  2135. }
  2136. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  2137. return cm.getCustomNodePricing(node.Preemptible)
  2138. }
  2139. node.Source = "prometheus"
  2140. // If any of the values are NaN or zero, replace them with the custom
  2141. // values as default.
  2142. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  2143. // them as strings like this?
  2144. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  2145. log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  2146. cpuCostStr := customPricingConfig.CPU
  2147. if node.Preemptible {
  2148. cpuCostStr = customPricingConfig.SpotCPU
  2149. }
  2150. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  2151. if err != nil {
  2152. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  2153. }
  2154. node.CostPerCPUHr = costPerCPUHr
  2155. node.Source += "/customCPU"
  2156. }
  2157. if math.IsNaN(node.CostPerGPUHr) {
  2158. log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  2159. gpuCostStr := customPricingConfig.GPU
  2160. if node.Preemptible {
  2161. gpuCostStr = customPricingConfig.SpotGPU
  2162. }
  2163. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  2164. if err != nil {
  2165. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  2166. }
  2167. node.CostPerGPUHr = costPerGPUHr
  2168. node.Source += "/customGPU"
  2169. }
  2170. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  2171. log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  2172. ramCostStr := customPricingConfig.RAM
  2173. if node.Preemptible {
  2174. ramCostStr = customPricingConfig.SpotRAM
  2175. }
  2176. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  2177. if err != nil {
  2178. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  2179. }
  2180. node.CostPerRAMGiBHr = costPerRAMHr
  2181. node.Source += "/customRAM"
  2182. }
  2183. return node
  2184. }
  2185. // getCustomNodePricing converts the CostModel's configured custom pricing
  2186. // values into a NodePricing instance.
  2187. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  2188. customPricingConfig, err := cm.Provider.GetConfig()
  2189. if err != nil {
  2190. return nil
  2191. }
  2192. cpuCostStr := customPricingConfig.CPU
  2193. gpuCostStr := customPricingConfig.GPU
  2194. ramCostStr := customPricingConfig.RAM
  2195. if spot {
  2196. cpuCostStr = customPricingConfig.SpotCPU
  2197. gpuCostStr = customPricingConfig.SpotGPU
  2198. ramCostStr = customPricingConfig.SpotRAM
  2199. }
  2200. node := &NodePricing{Source: "custom"}
  2201. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  2202. if err != nil {
  2203. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  2204. }
  2205. node.CostPerCPUHr = costPerCPUHr
  2206. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  2207. if err != nil {
  2208. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  2209. }
  2210. node.CostPerGPUHr = costPerGPUHr
  2211. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  2212. if err != nil {
  2213. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  2214. }
  2215. node.CostPerRAMGiBHr = costPerRAMHr
  2216. return node
  2217. }
  2218. // NodePricing describes the resource costs associated with a given node, as
  2219. // well as the source of the information (e.g. prometheus, custom)
  2220. type NodePricing struct {
  2221. Name string
  2222. NodeType string
  2223. ProviderID string
  2224. Preemptible bool
  2225. CostPerCPUHr float64
  2226. CostPerRAMGiBHr float64
  2227. CostPerGPUHr float64
  2228. Discount float64
  2229. Source string
  2230. }
  2231. // Pod describes a running pod's start and end time within a Window and
  2232. // all the Allocations (i.e. containers) contained within it.
  2233. type Pod struct {
  2234. Window kubecost.Window
  2235. Start time.Time
  2236. End time.Time
  2237. Key podKey
  2238. Allocations map[string]*kubecost.Allocation
  2239. }
  2240. // AppendContainer adds an entry for the given container name to the Pod.
  2241. func (p Pod) AppendContainer(container string) {
  2242. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  2243. alloc := &kubecost.Allocation{
  2244. Name: name,
  2245. Properties: &kubecost.AllocationProperties{},
  2246. Window: p.Window.Clone(),
  2247. Start: p.Start,
  2248. End: p.End,
  2249. }
  2250. alloc.Properties.Container = container
  2251. alloc.Properties.Pod = p.Key.Pod
  2252. alloc.Properties.Namespace = p.Key.Namespace
  2253. alloc.Properties.Cluster = p.Key.Cluster
  2254. p.Allocations[container] = alloc
  2255. }
  2256. // PVC describes a PersistentVolumeClaim
  2257. // TODO:CLEANUP move to pkg/kubecost?
  2258. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  2259. type PVC struct {
  2260. Bytes float64 `json:"bytes"`
  2261. Count int `json:"count"`
  2262. Name string `json:"name"`
  2263. Cluster string `json:"cluster"`
  2264. Namespace string `json:"namespace"`
  2265. Volume *PV `json:"persistentVolume"`
  2266. Mounted bool `json:"mounted"`
  2267. Start time.Time `json:"start"`
  2268. End time.Time `json:"end"`
  2269. }
  2270. // Cost computes the cumulative cost of the PVC
  2271. func (pvc *PVC) Cost() float64 {
  2272. if pvc == nil || pvc.Volume == nil {
  2273. return 0.0
  2274. }
  2275. gib := pvc.Bytes / 1024 / 1024 / 1024
  2276. hrs := pvc.Minutes() / 60.0
  2277. return pvc.Volume.CostPerGiBHour * gib * hrs
  2278. }
  2279. // Minutes computes the number of minutes over which the PVC is defined
  2280. func (pvc *PVC) Minutes() float64 {
  2281. if pvc == nil {
  2282. return 0.0
  2283. }
  2284. return pvc.End.Sub(pvc.Start).Minutes()
  2285. }
  2286. // String returns a string representation of the PVC
  2287. func (pvc *PVC) String() string {
  2288. if pvc == nil {
  2289. return "<nil>"
  2290. }
  2291. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  2292. }
  2293. // PV describes a PersistentVolume
  2294. // TODO:CLEANUP move to pkg/kubecost?
  2295. type PV struct {
  2296. Bytes float64 `json:"bytes"`
  2297. CostPerGiBHour float64 `json:"costPerGiBHour"`
  2298. Cluster string `json:"cluster"`
  2299. Name string `json:"name"`
  2300. StorageClass string `json:"storageClass"`
  2301. }
  2302. // String returns a string representation of the PV
  2303. func (pv *PV) String() string {
  2304. if pv == nil {
  2305. return "<nil>"
  2306. }
  2307. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  2308. }