allocation_helpers.go 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. "github.com/opencost/opencost/pkg/env"
  10. "github.com/opencost/opencost/pkg/kubecost"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/prom"
  13. "github.com/opencost/opencost/pkg/util/timeutil"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. // This is a bit of a hack to work around garbage data from cadvisor
  17. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  18. const CPU_SANITY_LIMIT = 512
  19. // Sanity Limit for PV usage, set to 10 PB, in bytes for now
  20. const KiB = 1024.0
  21. const MiB = 1024.0 * KiB
  22. const GiB = 1024.0 * MiB
  23. const TiB = 1024.0 * GiB
  24. const PiB = 1024.0 * TiB
  25. const PV_USAGE_SANITY_LIMIT_BYTES = 10.0 * PiB
  26. /* Pod Helpers */
  27. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
  28. // Assumes that window is positive and closed
  29. start, end := *window.Start(), *window.End()
  30. // Convert resolution duration to a query-ready string
  31. resStr := timeutil.DurationString(resolution)
  32. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  33. // Query for (start, end) by (pod, namespace, cluster) over the given
  34. // window, using the given resolution, and if necessary in batches no
  35. // larger than the given maximum batch size. If working in batches, track
  36. // overall progress by starting with (window.start, window.start) and
  37. // querying in batches no larger than maxBatchSize from start-to-end,
  38. // folding each result set into podMap as the results come back.
  39. coverage := kubecost.NewWindow(&start, &start)
  40. numQuery := 1
  41. for coverage.End().Before(end) {
  42. // Determine the (start, end) of the current batch
  43. batchStart := *coverage.End()
  44. batchEnd := coverage.End().Add(maxBatchSize)
  45. if batchEnd.After(end) {
  46. batchEnd = end
  47. }
  48. var resPods []*prom.QueryResult
  49. var err error
  50. maxTries := 3
  51. numTries := 0
  52. for resPods == nil && numTries < maxTries {
  53. numTries++
  54. // Query for the duration between start and end
  55. durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
  56. if durStr == "" {
  57. // Negative duration, so set empty results and don't query
  58. resPods = []*prom.QueryResult{}
  59. err = nil
  60. break
  61. }
  62. // Submit and profile query
  63. var queryPods string
  64. // If ingesting UIDs, avg on them
  65. if ingestPodUID {
  66. queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  67. } else {
  68. queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  69. }
  70. queryProfile := time.Now()
  71. resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
  72. if err != nil {
  73. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  74. resPods = nil
  75. }
  76. }
  77. if err != nil {
  78. return err
  79. }
  80. // queryFmtPodsUID will return both UID-containing results, and non-UID-containing results,
  81. // so filter out the non-containing results so we don't duplicate pods. This is due to the
  82. // default setup of Kubecost having replicated kube_pod_container_status_running and
  83. // included KSM kube_pod_container_status_running. Querying w/ UID will return both.
  84. if ingestPodUID {
  85. var resPodsUID []*prom.QueryResult
  86. for _, res := range resPods {
  87. _, err := res.GetString("uid")
  88. if err == nil {
  89. resPodsUID = append(resPodsUID, res)
  90. }
  91. }
  92. if len(resPodsUID) > 0 {
  93. resPods = resPodsUID
  94. } else {
  95. log.DedupedWarningf(5, "CostModel.ComputeAllocation: UID ingestion enabled, but query did not return any results with UID")
  96. }
  97. }
  98. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
  99. coverage = coverage.ExpandEnd(batchEnd)
  100. numQuery++
  101. }
  102. return nil
  103. }
  104. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
  105. for _, res := range resPods {
  106. if len(res.Values) == 0 {
  107. log.Warnf("CostModel.ComputeAllocation: empty minutes result")
  108. continue
  109. }
  110. cluster, err := res.GetString(env.GetPromClusterLabel())
  111. if err != nil {
  112. cluster = env.GetClusterID()
  113. }
  114. labels, err := res.GetStrings("namespace", "pod")
  115. if err != nil {
  116. log.Warnf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  117. continue
  118. }
  119. namespace := labels["namespace"]
  120. podName := labels["pod"]
  121. key := newPodKey(cluster, namespace, podName)
  122. // If thisPod UIDs are being used to ID pods, append them to the thisPod name in
  123. // the podKey.
  124. if ingestPodUID {
  125. uid, err := res.GetString("uid")
  126. if err != nil {
  127. log.Warnf("CostModel.ComputeAllocation: UID ingestion enabled, but query result missing field: %s", err)
  128. } else {
  129. newKey := newPodKey(cluster, namespace, podName+" "+uid)
  130. podUIDKeyMap[key] = append(podUIDKeyMap[key], newKey)
  131. key = newKey
  132. }
  133. }
  134. allocStart, allocEnd := calculateStartAndEnd(res, resolution, window)
  135. if allocStart.IsZero() || allocEnd.IsZero() {
  136. continue
  137. }
  138. // Set start if unset or this datum's start time is earlier than the
  139. // current earliest time.
  140. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  141. clusterStart[cluster] = allocStart
  142. }
  143. // Set end if unset or this datum's end time is later than the
  144. // current latest time.
  145. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  146. clusterEnd[cluster] = allocEnd
  147. }
  148. if thisPod, ok := podMap[key]; ok {
  149. // Pod has already been recorded, so update it accordingly
  150. if allocStart.Before(thisPod.Start) {
  151. thisPod.Start = allocStart
  152. }
  153. if allocEnd.After(thisPod.End) {
  154. thisPod.End = allocEnd
  155. }
  156. } else {
  157. // pod has not been recorded yet, so insert it
  158. podMap[key] = &pod{
  159. Window: window.Clone(),
  160. Start: allocStart,
  161. End: allocEnd,
  162. Key: key,
  163. Allocations: map[string]*kubecost.Allocation{},
  164. }
  165. }
  166. }
  167. }
  168. func applyCPUCoresAllocated(podMap map[podKey]*pod, resCPUCoresAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  169. for _, res := range resCPUCoresAllocated {
  170. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  171. if err != nil {
  172. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  173. continue
  174. }
  175. container, err := res.GetString("container")
  176. if err != nil {
  177. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  178. continue
  179. }
  180. var pods []*pod
  181. if thisPod, ok := podMap[key]; !ok {
  182. if uidKeys, ok := podUIDKeyMap[key]; ok {
  183. for _, uidKey := range uidKeys {
  184. thisPod, ok = podMap[uidKey]
  185. if ok {
  186. pods = append(pods, thisPod)
  187. }
  188. }
  189. } else {
  190. continue
  191. }
  192. } else {
  193. pods = []*pod{thisPod}
  194. }
  195. for _, thisPod := range pods {
  196. if _, ok := thisPod.Allocations[container]; !ok {
  197. thisPod.appendContainer(container)
  198. }
  199. cpuCores := res.Values[0].Value
  200. if cpuCores > CPU_SANITY_LIMIT {
  201. log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(thisPod.Allocations[container].Minutes()/60.0))
  202. cpuCores = 0.0
  203. }
  204. hours := thisPod.Allocations[container].Minutes() / 60.0
  205. thisPod.Allocations[container].CPUCoreHours = cpuCores * hours
  206. node, err := res.GetString("node")
  207. if err != nil {
  208. log.Warnf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  209. continue
  210. }
  211. thisPod.Allocations[container].Properties.Node = node
  212. thisPod.Node = node
  213. }
  214. }
  215. }
  216. func applyCPUCoresRequested(podMap map[podKey]*pod, resCPUCoresRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  217. for _, res := range resCPUCoresRequested {
  218. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  219. if err != nil {
  220. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  221. continue
  222. }
  223. container, err := res.GetString("container")
  224. if err != nil {
  225. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  226. continue
  227. }
  228. var pods []*pod
  229. if thisPod, ok := podMap[key]; !ok {
  230. if uidKeys, ok := podUIDKeyMap[key]; ok {
  231. for _, uidKey := range uidKeys {
  232. thisPod, ok = podMap[uidKey]
  233. if ok {
  234. pods = append(pods, thisPod)
  235. }
  236. }
  237. } else {
  238. continue
  239. }
  240. } else {
  241. pods = []*pod{thisPod}
  242. }
  243. for _, thisPod := range pods {
  244. if _, ok := thisPod.Allocations[container]; !ok {
  245. thisPod.appendContainer(container)
  246. }
  247. thisPod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  248. // If CPU allocation is less than requests, set CPUCoreHours to
  249. // request level.
  250. if thisPod.Allocations[container].CPUCores() < res.Values[0].Value {
  251. thisPod.Allocations[container].CPUCoreHours = res.Values[0].Value * (thisPod.Allocations[container].Minutes() / 60.0)
  252. }
  253. if thisPod.Allocations[container].CPUCores() > CPU_SANITY_LIMIT {
  254. log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(thisPod.Allocations[container].Minutes()/60.0))
  255. thisPod.Allocations[container].CPUCoreHours = res.Values[0].Value * (thisPod.Allocations[container].Minutes() / 60.0)
  256. }
  257. node, err := res.GetString("node")
  258. if err != nil {
  259. log.Warnf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  260. continue
  261. }
  262. thisPod.Allocations[container].Properties.Node = node
  263. thisPod.Node = node
  264. }
  265. }
  266. }
  267. func applyCPUCoresUsedAvg(podMap map[podKey]*pod, resCPUCoresUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  268. for _, res := range resCPUCoresUsedAvg {
  269. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  270. if err != nil {
  271. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  272. continue
  273. }
  274. container, err := res.GetString("container")
  275. if container == "" || err != nil {
  276. container, err = res.GetString("container_name")
  277. if err != nil {
  278. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  279. continue
  280. }
  281. }
  282. var pods []*pod
  283. if thisPod, ok := podMap[key]; !ok {
  284. if uidKeys, ok := podUIDKeyMap[key]; ok {
  285. for _, uidKey := range uidKeys {
  286. thisPod, ok = podMap[uidKey]
  287. if ok {
  288. pods = append(pods, thisPod)
  289. }
  290. }
  291. } else {
  292. continue
  293. }
  294. } else {
  295. pods = []*pod{thisPod}
  296. }
  297. for _, thisPod := range pods {
  298. if _, ok := thisPod.Allocations[container]; !ok {
  299. thisPod.appendContainer(container)
  300. }
  301. thisPod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  302. if res.Values[0].Value > CPU_SANITY_LIMIT {
  303. log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  304. thisPod.Allocations[container].CPUCoreUsageAverage = 0.0
  305. }
  306. }
  307. }
  308. }
  309. func applyCPUCoresUsedMax(podMap map[podKey]*pod, resCPUCoresUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  310. for _, res := range resCPUCoresUsedMax {
  311. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  312. if err != nil {
  313. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  314. continue
  315. }
  316. container, err := res.GetString("container")
  317. if container == "" || err != nil {
  318. container, err = res.GetString("container_name")
  319. if err != nil {
  320. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  321. continue
  322. }
  323. }
  324. var pods []*pod
  325. if thisPod, ok := podMap[key]; !ok {
  326. if uidKeys, ok := podUIDKeyMap[key]; ok {
  327. for _, uidKey := range uidKeys {
  328. thisPod, ok = podMap[uidKey]
  329. if ok {
  330. pods = append(pods, thisPod)
  331. }
  332. }
  333. } else {
  334. continue
  335. }
  336. } else {
  337. pods = []*pod{thisPod}
  338. }
  339. for _, thisPod := range pods {
  340. if _, ok := thisPod.Allocations[container]; !ok {
  341. thisPod.appendContainer(container)
  342. }
  343. if thisPod.Allocations[container].RawAllocationOnly == nil {
  344. thisPod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  345. CPUCoreUsageMax: res.Values[0].Value,
  346. }
  347. } else {
  348. thisPod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  349. }
  350. }
  351. }
  352. }
  353. func applyRAMBytesAllocated(podMap map[podKey]*pod, resRAMBytesAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  354. for _, res := range resRAMBytesAllocated {
  355. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  356. if err != nil {
  357. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  358. continue
  359. }
  360. container, err := res.GetString("container")
  361. if err != nil {
  362. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  363. continue
  364. }
  365. var pods []*pod
  366. if thisPod, ok := podMap[key]; !ok {
  367. if uidKeys, ok := podUIDKeyMap[key]; ok {
  368. for _, uidKey := range uidKeys {
  369. thisPod, ok = podMap[uidKey]
  370. if ok {
  371. pods = append(pods, thisPod)
  372. }
  373. }
  374. } else {
  375. continue
  376. }
  377. } else {
  378. pods = []*pod{thisPod}
  379. }
  380. for _, thisPod := range pods {
  381. if _, ok := thisPod.Allocations[container]; !ok {
  382. thisPod.appendContainer(container)
  383. }
  384. ramBytes := res.Values[0].Value
  385. hours := thisPod.Allocations[container].Minutes() / 60.0
  386. thisPod.Allocations[container].RAMByteHours = ramBytes * hours
  387. node, err := res.GetString("node")
  388. if err != nil {
  389. log.Warnf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  390. continue
  391. }
  392. thisPod.Allocations[container].Properties.Node = node
  393. thisPod.Node = node
  394. }
  395. }
  396. }
  397. func applyRAMBytesRequested(podMap map[podKey]*pod, resRAMBytesRequested []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  398. for _, res := range resRAMBytesRequested {
  399. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  400. if err != nil {
  401. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  402. continue
  403. }
  404. container, err := res.GetString("container")
  405. if err != nil {
  406. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  407. continue
  408. }
  409. var pods []*pod
  410. if thisPod, ok := podMap[key]; !ok {
  411. if uidKeys, ok := podUIDKeyMap[key]; ok {
  412. for _, uidKey := range uidKeys {
  413. thisPod, ok = podMap[uidKey]
  414. if ok {
  415. pods = append(pods, thisPod)
  416. }
  417. }
  418. } else {
  419. continue
  420. }
  421. } else {
  422. pods = []*pod{thisPod}
  423. }
  424. for _, pod := range pods {
  425. if _, ok := pod.Allocations[container]; !ok {
  426. pod.appendContainer(container)
  427. }
  428. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  429. // If RAM allocation is less than requests, set RAMByteHours to
  430. // request level.
  431. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  432. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  433. }
  434. node, err := res.GetString("node")
  435. if err != nil {
  436. log.Warnf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  437. continue
  438. }
  439. pod.Allocations[container].Properties.Node = node
  440. pod.Node = node
  441. }
  442. }
  443. }
  444. func applyRAMBytesUsedAvg(podMap map[podKey]*pod, resRAMBytesUsedAvg []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  445. for _, res := range resRAMBytesUsedAvg {
  446. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  447. if err != nil {
  448. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  449. continue
  450. }
  451. container, err := res.GetString("container")
  452. if container == "" || err != nil {
  453. container, err = res.GetString("container_name")
  454. if err != nil {
  455. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  456. continue
  457. }
  458. }
  459. var pods []*pod
  460. if thisPod, ok := podMap[key]; !ok {
  461. if uidKeys, ok := podUIDKeyMap[key]; ok {
  462. for _, uidKey := range uidKeys {
  463. thisPod, ok = podMap[uidKey]
  464. if ok {
  465. pods = append(pods, thisPod)
  466. }
  467. }
  468. } else {
  469. continue
  470. }
  471. } else {
  472. pods = []*pod{thisPod}
  473. }
  474. for _, thisPod := range pods {
  475. if _, ok := thisPod.Allocations[container]; !ok {
  476. thisPod.appendContainer(container)
  477. }
  478. thisPod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  479. }
  480. }
  481. }
  482. func applyRAMBytesUsedMax(podMap map[podKey]*pod, resRAMBytesUsedMax []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  483. for _, res := range resRAMBytesUsedMax {
  484. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  485. if err != nil {
  486. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  487. continue
  488. }
  489. container, err := res.GetString("container")
  490. if container == "" || err != nil {
  491. container, err = res.GetString("container_name")
  492. if err != nil {
  493. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  494. continue
  495. }
  496. }
  497. var pods []*pod
  498. if thisPod, ok := podMap[key]; !ok {
  499. if uidKeys, ok := podUIDKeyMap[key]; ok {
  500. for _, uidKey := range uidKeys {
  501. thisPod, ok = podMap[uidKey]
  502. if ok {
  503. pods = append(pods, thisPod)
  504. }
  505. }
  506. } else {
  507. continue
  508. }
  509. } else {
  510. pods = []*pod{thisPod}
  511. }
  512. for _, thisPod := range pods {
  513. if _, ok := thisPod.Allocations[container]; !ok {
  514. thisPod.appendContainer(container)
  515. }
  516. if thisPod.Allocations[container].RawAllocationOnly == nil {
  517. thisPod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  518. RAMBytesUsageMax: res.Values[0].Value,
  519. }
  520. } else {
  521. thisPod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  522. }
  523. }
  524. }
  525. }
  526. func applyGPUsAllocated(podMap map[podKey]*pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  527. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  528. resGPUsRequested = resGPUsAllocated
  529. }
  530. for _, res := range resGPUsRequested {
  531. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  532. if err != nil {
  533. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  534. continue
  535. }
  536. container, err := res.GetString("container")
  537. if err != nil {
  538. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  539. continue
  540. }
  541. var pods []*pod
  542. if thisPod, ok := podMap[key]; !ok {
  543. if uidKeys, ok := podUIDKeyMap[key]; ok {
  544. for _, uidKey := range uidKeys {
  545. thisPod, ok = podMap[uidKey]
  546. if ok {
  547. pods = append(pods, thisPod)
  548. }
  549. }
  550. } else {
  551. continue
  552. }
  553. } else {
  554. pods = []*pod{thisPod}
  555. }
  556. for _, thisPod := range pods {
  557. if _, ok := thisPod.Allocations[container]; !ok {
  558. thisPod.appendContainer(container)
  559. }
  560. hrs := thisPod.Allocations[container].Minutes() / 60.0
  561. thisPod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  562. }
  563. }
  564. }
  565. func applyNetworkTotals(podMap map[podKey]*pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey) {
  566. for _, res := range resNetworkTransferBytes {
  567. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  568. if err != nil {
  569. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  570. continue
  571. }
  572. var pods []*pod
  573. if thisPod, ok := podMap[podKey]; !ok {
  574. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  575. for _, uidKey := range uidKeys {
  576. thisPod, ok = podMap[uidKey]
  577. if ok {
  578. pods = append(pods, thisPod)
  579. }
  580. }
  581. } else {
  582. continue
  583. }
  584. } else {
  585. pods = []*pod{thisPod}
  586. }
  587. for _, thisPod := range pods {
  588. for _, alloc := range thisPod.Allocations {
  589. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(thisPod.Allocations)) / float64(len(pods))
  590. }
  591. }
  592. }
  593. for _, res := range resNetworkReceiveBytes {
  594. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  595. if err != nil {
  596. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  597. continue
  598. }
  599. var pods []*pod
  600. if thisPod, ok := podMap[podKey]; !ok {
  601. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  602. for _, uidKey := range uidKeys {
  603. thisPod, ok = podMap[uidKey]
  604. if ok {
  605. pods = append(pods, thisPod)
  606. }
  607. }
  608. } else {
  609. continue
  610. }
  611. } else {
  612. pods = []*pod{thisPod}
  613. }
  614. for _, thisPod := range pods {
  615. for _, alloc := range thisPod.Allocations {
  616. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(thisPod.Allocations)) / float64(len(pods))
  617. }
  618. }
  619. }
  620. }
  621. func applyNetworkAllocation(podMap map[podKey]*pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, networkCostSubType string) {
  622. costPerGiBByCluster := map[string]float64{}
  623. for _, res := range resNetworkCostPerGiB {
  624. cluster, err := res.GetString(env.GetPromClusterLabel())
  625. if err != nil {
  626. cluster = env.GetClusterID()
  627. }
  628. costPerGiBByCluster[cluster] = res.Values[0].Value
  629. }
  630. for _, res := range resNetworkGiB {
  631. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  632. if err != nil {
  633. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  634. continue
  635. }
  636. var pods []*pod
  637. if thisPod, ok := podMap[podKey]; !ok {
  638. if uidKeys, ok := podUIDKeyMap[podKey]; ok {
  639. for _, uidKey := range uidKeys {
  640. thisPod, ok = podMap[uidKey]
  641. if ok {
  642. pods = append(pods, thisPod)
  643. }
  644. }
  645. } else {
  646. continue
  647. }
  648. } else {
  649. pods = []*pod{thisPod}
  650. }
  651. for _, thisPod := range pods {
  652. for _, alloc := range thisPod.Allocations {
  653. gib := res.Values[0].Value / float64(len(thisPod.Allocations))
  654. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  655. currentNetworkSubCost := gib * costPerGiB / float64(len(pods))
  656. switch networkCostSubType {
  657. case networkCrossZoneCost:
  658. alloc.NetworkCrossZoneCost = currentNetworkSubCost
  659. case networkCrossRegionCost:
  660. alloc.NetworkCrossRegionCost = currentNetworkSubCost
  661. case networkInternetCost:
  662. alloc.NetworkInternetCost = currentNetworkSubCost
  663. default:
  664. log.Warnf("CostModel.applyNetworkAllocation: unknown network subtype passed to the function: %s", networkCostSubType)
  665. }
  666. alloc.NetworkCost += currentNetworkSubCost
  667. }
  668. }
  669. }
  670. }
  671. func resToNodeLabels(resNodeLabels []*prom.QueryResult) map[nodeKey]map[string]string {
  672. nodeLabels := map[nodeKey]map[string]string{}
  673. for _, res := range resNodeLabels {
  674. nodeKey, err := resultNodeKey(res, env.GetPromClusterLabel(), "node")
  675. if err != nil {
  676. continue
  677. }
  678. if _, ok := nodeLabels[nodeKey]; !ok {
  679. nodeLabels[nodeKey] = map[string]string{}
  680. }
  681. for _, rawK := range env.GetAllocationNodeLabelsIncludeList() {
  682. labels := res.GetLabels()
  683. // Sanitize the given label name to match Prometheus formatting
  684. // e.g. topology.kubernetes.io/zone => topology_kubernetes_io_zone
  685. k := prom.SanitizeLabelName(rawK)
  686. if v, ok := labels[k]; ok {
  687. nodeLabels[nodeKey][k] = v
  688. continue
  689. }
  690. // Try with the "label_" prefix, if not found
  691. // e.g. topology_kubernetes_io_zone => label_topology_kubernetes_io_zone
  692. k = fmt.Sprintf("label_%s", k)
  693. if v, ok := labels[k]; ok {
  694. nodeLabels[nodeKey][k] = v
  695. }
  696. }
  697. }
  698. return nodeLabels
  699. }
  700. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  701. namespaceLabels := map[namespaceKey]map[string]string{}
  702. for _, res := range resNamespaceLabels {
  703. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  704. if err != nil {
  705. continue
  706. }
  707. if _, ok := namespaceLabels[nsKey]; !ok {
  708. namespaceLabels[nsKey] = map[string]string{}
  709. }
  710. for k, l := range res.GetLabels() {
  711. namespaceLabels[nsKey][k] = l
  712. }
  713. }
  714. return namespaceLabels
  715. }
  716. func resToPodLabels(resPodLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  717. podLabels := map[podKey]map[string]string{}
  718. for _, res := range resPodLabels {
  719. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  720. if err != nil {
  721. continue
  722. }
  723. var keys []podKey
  724. if ingestPodUID {
  725. if uidKeys, ok := podUIDKeyMap[key]; ok {
  726. keys = append(keys, uidKeys...)
  727. }
  728. } else {
  729. keys = []podKey{key}
  730. }
  731. for _, key := range keys {
  732. if _, ok := podLabels[key]; !ok {
  733. podLabels[key] = map[string]string{}
  734. }
  735. for k, l := range res.GetLabels() {
  736. podLabels[key][k] = l
  737. }
  738. }
  739. }
  740. return podLabels
  741. }
  742. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  743. namespaceAnnotations := map[string]map[string]string{}
  744. for _, res := range resNamespaceAnnotations {
  745. namespace, err := res.GetString("namespace")
  746. if err != nil {
  747. continue
  748. }
  749. if _, ok := namespaceAnnotations[namespace]; !ok {
  750. namespaceAnnotations[namespace] = map[string]string{}
  751. }
  752. for k, l := range res.GetAnnotations() {
  753. namespaceAnnotations[namespace][k] = l
  754. }
  755. }
  756. return namespaceAnnotations
  757. }
  758. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]map[string]string {
  759. podAnnotations := map[podKey]map[string]string{}
  760. for _, res := range resPodAnnotations {
  761. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  762. if err != nil {
  763. continue
  764. }
  765. var keys []podKey
  766. if ingestPodUID {
  767. if uidKeys, ok := podUIDKeyMap[key]; ok {
  768. keys = append(keys, uidKeys...)
  769. }
  770. } else {
  771. keys = []podKey{key}
  772. }
  773. for _, key := range keys {
  774. if _, ok := podAnnotations[key]; !ok {
  775. podAnnotations[key] = map[string]string{}
  776. }
  777. for k, l := range res.GetAnnotations() {
  778. podAnnotations[key][k] = l
  779. }
  780. }
  781. }
  782. return podAnnotations
  783. }
  784. func applyLabels(podMap map[podKey]*pod, nodeLabels map[nodeKey]map[string]string, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  785. for podKey, pod := range podMap {
  786. for _, alloc := range pod.Allocations {
  787. allocLabels := alloc.Properties.Labels
  788. if allocLabels == nil {
  789. allocLabels = make(map[string]string)
  790. }
  791. nsLabels := alloc.Properties.NamespaceLabels
  792. if nsLabels == nil {
  793. nsLabels = make(map[string]string)
  794. }
  795. // Apply node labels first, then namespace labels, then pod labels
  796. // so that pod labels overwrite namespace labels, which overwrite
  797. // node labels.
  798. if nodeLabels != nil {
  799. nodeKey := newNodeKey(pod.Key.Cluster, pod.Node)
  800. if labels, ok := nodeLabels[nodeKey]; ok {
  801. for k, v := range labels {
  802. allocLabels[k] = v
  803. }
  804. }
  805. }
  806. nsKey := podKey.namespaceKey
  807. if labels, ok := namespaceLabels[nsKey]; ok {
  808. for k, v := range labels {
  809. allocLabels[k] = v
  810. nsLabels[k] = v
  811. }
  812. }
  813. if labels, ok := podLabels[podKey]; ok {
  814. for k, v := range labels {
  815. allocLabels[k] = v
  816. }
  817. }
  818. alloc.Properties.Labels = allocLabels
  819. alloc.Properties.NamespaceLabels = nsLabels
  820. }
  821. }
  822. }
  823. func applyAnnotations(podMap map[podKey]*pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  824. for key, pod := range podMap {
  825. for _, alloc := range pod.Allocations {
  826. allocAnnotations := alloc.Properties.Annotations
  827. if allocAnnotations == nil {
  828. allocAnnotations = make(map[string]string)
  829. }
  830. nsAnnotations := alloc.Properties.NamespaceAnnotations
  831. if nsAnnotations == nil {
  832. nsAnnotations = make(map[string]string)
  833. }
  834. // Apply namespace annotations first, then pod annotations so that
  835. // pod labels overwrite namespace labels.
  836. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  837. for k, v := range labels {
  838. allocAnnotations[k] = v
  839. nsAnnotations[k] = v
  840. }
  841. }
  842. if labels, ok := podAnnotations[key]; ok {
  843. for k, v := range labels {
  844. allocAnnotations[k] = v
  845. }
  846. }
  847. alloc.Properties.Annotations = allocAnnotations
  848. alloc.Properties.NamespaceAnnotations = nsAnnotations
  849. }
  850. }
  851. }
  852. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  853. deploymentLabels := map[controllerKey]map[string]string{}
  854. for _, res := range resDeploymentLabels {
  855. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  856. if err != nil {
  857. continue
  858. }
  859. if _, ok := deploymentLabels[controllerKey]; !ok {
  860. deploymentLabels[controllerKey] = map[string]string{}
  861. }
  862. for k, l := range res.GetLabels() {
  863. deploymentLabels[controllerKey][k] = l
  864. }
  865. }
  866. // Prune duplicate deployments. That is, if the same deployment exists with
  867. // hyphens instead of underscores, keep the one that uses hyphens.
  868. for key := range deploymentLabels {
  869. if strings.Contains(key.Controller, "_") {
  870. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  871. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  872. if _, ok := deploymentLabels[duplicateKey]; ok {
  873. delete(deploymentLabels, key)
  874. }
  875. }
  876. }
  877. return deploymentLabels
  878. }
  879. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  880. statefulSetLabels := map[controllerKey]map[string]string{}
  881. for _, res := range resStatefulSetLabels {
  882. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  883. if err != nil {
  884. continue
  885. }
  886. if _, ok := statefulSetLabels[controllerKey]; !ok {
  887. statefulSetLabels[controllerKey] = map[string]string{}
  888. }
  889. for k, l := range res.GetLabels() {
  890. statefulSetLabels[controllerKey][k] = l
  891. }
  892. }
  893. // Prune duplicate stateful sets. That is, if the same stateful set exists
  894. // with hyphens instead of underscores, keep the one that uses hyphens.
  895. for key := range statefulSetLabels {
  896. if strings.Contains(key.Controller, "_") {
  897. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  898. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  899. if _, ok := statefulSetLabels[duplicateKey]; ok {
  900. delete(statefulSetLabels, key)
  901. }
  902. }
  903. }
  904. return statefulSetLabels
  905. }
  906. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  907. podControllerMap := map[podKey]controllerKey{}
  908. // For each controller, turn the labels into a selector and attempt to
  909. // match it with each set of pod labels. A match indicates that the pod
  910. // belongs to the controller.
  911. for cKey, cLabels := range controllerLabels {
  912. selector := labels.Set(cLabels).AsSelectorPreValidated()
  913. for pKey, pLabels := range podLabels {
  914. // If the pod is in a different cluster or namespace, there is
  915. // no need to compare the labels.
  916. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  917. continue
  918. }
  919. podLabelSet := labels.Set(pLabels)
  920. if selector.Matches(podLabelSet) {
  921. if _, ok := podControllerMap[pKey]; ok {
  922. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  923. }
  924. podControllerMap[pKey] = cKey
  925. }
  926. }
  927. }
  928. return podControllerMap
  929. }
  930. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  931. daemonSetLabels := map[podKey]controllerKey{}
  932. for _, res := range resDaemonSetLabels {
  933. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  934. if err != nil {
  935. continue
  936. }
  937. pod, err := res.GetString("pod")
  938. if err != nil {
  939. log.Warnf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  940. }
  941. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  942. var keys []podKey
  943. if ingestPodUID {
  944. if uidKeys, ok := podUIDKeyMap[key]; ok {
  945. keys = append(keys, uidKeys...)
  946. }
  947. } else {
  948. keys = []podKey{key}
  949. }
  950. for _, key := range keys {
  951. daemonSetLabels[key] = controllerKey
  952. }
  953. }
  954. return daemonSetLabels
  955. }
  956. func resToPodJobMap(resJobLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  957. jobLabels := map[podKey]controllerKey{}
  958. for _, res := range resJobLabels {
  959. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  960. if err != nil {
  961. continue
  962. }
  963. // Convert the name of Jobs generated by CronJobs to the name of the
  964. // CronJob by stripping the timestamp off the end.
  965. match := isCron.FindStringSubmatch(controllerKey.Controller)
  966. if match != nil {
  967. controllerKey.Controller = match[1]
  968. }
  969. pod, err := res.GetString("pod")
  970. if err != nil {
  971. log.Warnf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  972. }
  973. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  974. var keys []podKey
  975. if ingestPodUID {
  976. if uidKeys, ok := podUIDKeyMap[key]; ok {
  977. keys = append(keys, uidKeys...)
  978. }
  979. } else {
  980. keys = []podKey{key}
  981. }
  982. for _, key := range keys {
  983. jobLabels[key] = controllerKey
  984. }
  985. }
  986. return jobLabels
  987. }
  988. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult, resReplicaSetsWithRolloutOwner []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
  989. // Build out set of ReplicaSets that have no owners, themselves, such that
  990. // the ReplicaSet should be used as the owner of the Pods it controls.
  991. // (This should exclude, for example, ReplicaSets that are controlled by
  992. // Deployments, in which case the Deployment should be the pod's owner.)
  993. // Additionally, add to this set of ReplicaSets those ReplicaSets that
  994. // are owned by a Rollout
  995. replicaSets := map[controllerKey]struct{}{}
  996. // Create unowned ReplicaSet controller keys
  997. for _, res := range resReplicaSetsWithoutOwners {
  998. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  999. if err != nil {
  1000. continue
  1001. }
  1002. replicaSets[controllerKey] = struct{}{}
  1003. }
  1004. // Create Rollout-owned ReplicaSet controller keys
  1005. for _, res := range resReplicaSetsWithRolloutOwner {
  1006. controllerKey, err := resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1007. if err != nil {
  1008. continue
  1009. }
  1010. replicaSets[controllerKey] = struct{}{}
  1011. }
  1012. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1013. // do not appear in the set of unowned/Rollout-owned ReplicaSets above.
  1014. podToReplicaSet := map[podKey]controllerKey{}
  1015. for _, res := range resPodsWithReplicaSetOwner {
  1016. // First, check if this pod is owned by an unowned ReplicaSet
  1017. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1018. if err != nil {
  1019. continue
  1020. } else if _, ok := replicaSets[controllerKey]; !ok {
  1021. // If the pod is not owned by an unowned ReplicaSet, check if
  1022. // it's owned by a Rollout-owned ReplicaSet
  1023. controllerKey, err = resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1024. if err != nil {
  1025. continue
  1026. } else if _, ok := replicaSets[controllerKey]; !ok {
  1027. continue
  1028. }
  1029. }
  1030. pod, err := res.GetString("pod")
  1031. if err != nil {
  1032. log.Warnf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1033. }
  1034. key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1035. var keys []podKey
  1036. if ingestPodUID {
  1037. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1038. keys = append(keys, uidKeys...)
  1039. }
  1040. } else {
  1041. keys = []podKey{key}
  1042. }
  1043. for _, key := range keys {
  1044. podToReplicaSet[key] = controllerKey
  1045. }
  1046. }
  1047. return podToReplicaSet
  1048. }
  1049. func applyControllersToPods(podMap map[podKey]*pod, podControllerMap map[podKey]controllerKey) {
  1050. for key, pod := range podMap {
  1051. for _, alloc := range pod.Allocations {
  1052. if controllerKey, ok := podControllerMap[key]; ok {
  1053. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1054. alloc.Properties.Controller = controllerKey.Controller
  1055. }
  1056. }
  1057. }
  1058. }
  1059. /* Service Helpers */
  1060. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  1061. serviceLabels := map[serviceKey]map[string]string{}
  1062. for _, res := range resServiceLabels {
  1063. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  1064. if err != nil {
  1065. continue
  1066. }
  1067. if _, ok := serviceLabels[serviceKey]; !ok {
  1068. serviceLabels[serviceKey] = map[string]string{}
  1069. }
  1070. for k, l := range res.GetLabels() {
  1071. serviceLabels[serviceKey][k] = l
  1072. }
  1073. }
  1074. // Prune duplicate services. That is, if the same service exists with
  1075. // hyphens instead of underscores, keep the one that uses hyphens.
  1076. for key := range serviceLabels {
  1077. if strings.Contains(key.Service, "_") {
  1078. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1079. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1080. if _, ok := serviceLabels[duplicateKey]; ok {
  1081. delete(serviceLabels, key)
  1082. }
  1083. }
  1084. }
  1085. return serviceLabels
  1086. }
  1087. func applyServicesToPods(podMap map[podKey]*pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1088. podServicesMap := map[podKey][]serviceKey{}
  1089. // For each service, turn the labels into a selector and attempt to
  1090. // match it with each set of pod labels. A match indicates that the pod
  1091. // belongs to the service.
  1092. for sKey, sLabels := range serviceLabels {
  1093. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1094. for pKey, pLabels := range podLabels {
  1095. // If the pod is in a different cluster or namespace, there is
  1096. // no need to compare the labels.
  1097. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1098. continue
  1099. }
  1100. podLabelSet := labels.Set(pLabels)
  1101. if selector.Matches(podLabelSet) {
  1102. if _, ok := podServicesMap[pKey]; !ok {
  1103. podServicesMap[pKey] = []serviceKey{}
  1104. }
  1105. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1106. }
  1107. }
  1108. }
  1109. // For each allocation in each pod, attempt to find and apply the list of
  1110. // services associated with the allocation's pod.
  1111. for key, pod := range podMap {
  1112. for _, alloc := range pod.Allocations {
  1113. if sKeys, ok := podServicesMap[key]; ok {
  1114. services := []string{}
  1115. for _, sKey := range sKeys {
  1116. services = append(services, sKey.Service)
  1117. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1118. }
  1119. alloc.Properties.Services = services
  1120. }
  1121. }
  1122. }
  1123. }
  1124. func getLoadBalancerCosts(lbMap map[serviceKey]*lbCost, resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration, window kubecost.Window) {
  1125. for _, res := range resLBActiveMins {
  1126. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1127. if err != nil || len(res.Values) == 0 {
  1128. continue
  1129. }
  1130. // load balancers have interpolation for costs, we don't need to offset the resolution
  1131. lbStart, lbEnd := calculateStartAndEnd(res, resolution, window)
  1132. if lbStart.IsZero() || lbEnd.IsZero() {
  1133. log.Warnf("CostModel.ComputeAllocation: pvc %s has no running time", serviceKey)
  1134. }
  1135. lbMap[serviceKey] = &lbCost{
  1136. Start: lbStart,
  1137. End: lbEnd,
  1138. }
  1139. }
  1140. for _, res := range resLBCost {
  1141. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1142. if err != nil {
  1143. continue
  1144. }
  1145. // get the ingress IP to determine if this is a private LB
  1146. ip, err := res.GetString("ingress_ip")
  1147. if err != nil {
  1148. log.Warnf("error getting ingress ip for key %s: %v, skipping", serviceKey, err)
  1149. // do not count the time that the service was being created or deleted
  1150. // ingress IP will be empty string
  1151. // only add cost to allocation when external IP is provisioned
  1152. if ip == "" {
  1153. continue
  1154. }
  1155. }
  1156. // Apply cost as price-per-hour * hours
  1157. if lb, ok := lbMap[serviceKey]; ok {
  1158. lbPricePerHr := res.Values[0].Value
  1159. // interpolate any missing data
  1160. resolutionHours := resolution.Hours()
  1161. resultHours := lb.End.Sub(lb.Start).Hours()
  1162. scaleFactor := (resolutionHours + resultHours) / resultHours
  1163. // after scaling, we can adjust the timings to reflect the interpolated data
  1164. lb.End = lb.End.Add(resolution)
  1165. lb.TotalCost += lbPricePerHr * resultHours * scaleFactor
  1166. lb.Ip = ip
  1167. lb.Private = privateIPCheck(ip)
  1168. } else {
  1169. log.DedupedWarningf(20, "CostModel: found minutes for key that does not exist: %s", serviceKey)
  1170. }
  1171. }
  1172. }
  1173. func applyLoadBalancersToPods(window kubecost.Window, podMap map[podKey]*pod, lbMap map[serviceKey]*lbCost, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1174. for sKey, lb := range lbMap {
  1175. totalHours := 0.0
  1176. allocHours := make(map[*kubecost.Allocation]float64)
  1177. allocs, ok := allocsByService[sKey]
  1178. // if there are no allocations using the service, add its cost to the Unmounted pod for its cluster
  1179. if !ok {
  1180. pod := getUnmountedPodForCluster(window, podMap, sKey.Cluster)
  1181. pod.Allocations[kubecost.UnmountedSuffix].LoadBalancerCost += lb.TotalCost
  1182. pod.Allocations[kubecost.UnmountedSuffix].Properties.Services = append(pod.Allocations[kubecost.UnmountedSuffix].Properties.Services, sKey.Service)
  1183. }
  1184. // Add portion of load balancing cost to each allocation
  1185. // proportional to the total number of hours allocations used the load balancer
  1186. for _, alloc := range allocs {
  1187. // Determine the (start, end) of the relationship between the
  1188. // given lbCost and the associated Allocation so that a precise
  1189. // number of hours can be used to compute cumulative cost.
  1190. s, e := alloc.Start, alloc.End
  1191. if lb.Start.After(alloc.Start) {
  1192. s = lb.Start
  1193. }
  1194. if lb.End.Before(alloc.End) {
  1195. e = lb.End
  1196. }
  1197. hours := e.Sub(s).Hours()
  1198. // A negative number of hours signifies no overlap between the windows
  1199. if hours > 0 {
  1200. totalHours += hours
  1201. allocHours[alloc] = hours
  1202. }
  1203. }
  1204. // Distribute cost of service once total hours is calculated
  1205. for alloc, hours := range allocHours {
  1206. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1207. }
  1208. for _, alloc := range allocs {
  1209. if alloc.LoadBalancers == nil {
  1210. alloc.LoadBalancers = kubecost.LbAllocations{}
  1211. }
  1212. if _, found := alloc.LoadBalancers[sKey.String()]; found {
  1213. alloc.LoadBalancers[sKey.String()].Cost += alloc.LoadBalancerCost
  1214. } else {
  1215. alloc.LoadBalancers[sKey.String()] = &kubecost.LbAllocation{
  1216. Service: sKey.Namespace + "/" + sKey.Service,
  1217. Cost: alloc.LoadBalancerCost,
  1218. Private: lb.Private,
  1219. Ip: lb.Ip,
  1220. }
  1221. }
  1222. }
  1223. // If there was no overlap apply to Unmounted pod
  1224. if len(allocHours) == 0 {
  1225. pod := getUnmountedPodForCluster(window, podMap, sKey.Cluster)
  1226. pod.Allocations[kubecost.UnmountedSuffix].LoadBalancerCost += lb.TotalCost
  1227. pod.Allocations[kubecost.UnmountedSuffix].Properties.Services = append(pod.Allocations[kubecost.UnmountedSuffix].Properties.Services, sKey.Service)
  1228. }
  1229. }
  1230. }
  1231. /* Node Helpers */
  1232. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*nodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1233. for _, res := range resNodeCostPerCPUHr {
  1234. cluster, err := res.GetString(env.GetPromClusterLabel())
  1235. if err != nil {
  1236. cluster = env.GetClusterID()
  1237. }
  1238. node, err := res.GetString("node")
  1239. if err != nil {
  1240. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1241. continue
  1242. }
  1243. instanceType, err := res.GetString("instance_type")
  1244. if err != nil {
  1245. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1246. continue
  1247. }
  1248. providerID, err := res.GetString("provider_id")
  1249. if err != nil {
  1250. log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1251. continue
  1252. }
  1253. key := newNodeKey(cluster, node)
  1254. if _, ok := nodeMap[key]; !ok {
  1255. nodeMap[key] = &nodePricing{
  1256. Name: node,
  1257. NodeType: instanceType,
  1258. ProviderID: provider.ParseID(providerID),
  1259. }
  1260. }
  1261. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1262. }
  1263. }
  1264. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*nodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1265. for _, res := range resNodeCostPerRAMGiBHr {
  1266. cluster, err := res.GetString(env.GetPromClusterLabel())
  1267. if err != nil {
  1268. cluster = env.GetClusterID()
  1269. }
  1270. node, err := res.GetString("node")
  1271. if err != nil {
  1272. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1273. continue
  1274. }
  1275. instanceType, err := res.GetString("instance_type")
  1276. if err != nil {
  1277. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1278. continue
  1279. }
  1280. providerID, err := res.GetString("provider_id")
  1281. if err != nil {
  1282. log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1283. continue
  1284. }
  1285. key := newNodeKey(cluster, node)
  1286. if _, ok := nodeMap[key]; !ok {
  1287. nodeMap[key] = &nodePricing{
  1288. Name: node,
  1289. NodeType: instanceType,
  1290. ProviderID: provider.ParseID(providerID),
  1291. }
  1292. }
  1293. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1294. }
  1295. }
  1296. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*nodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1297. for _, res := range resNodeCostPerGPUHr {
  1298. cluster, err := res.GetString(env.GetPromClusterLabel())
  1299. if err != nil {
  1300. cluster = env.GetClusterID()
  1301. }
  1302. node, err := res.GetString("node")
  1303. if err != nil {
  1304. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1305. continue
  1306. }
  1307. instanceType, err := res.GetString("instance_type")
  1308. if err != nil {
  1309. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1310. continue
  1311. }
  1312. providerID, err := res.GetString("provider_id")
  1313. if err != nil {
  1314. log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: \"%s\" for node \"%s\"", err, node)
  1315. continue
  1316. }
  1317. key := newNodeKey(cluster, node)
  1318. if _, ok := nodeMap[key]; !ok {
  1319. nodeMap[key] = &nodePricing{
  1320. Name: node,
  1321. NodeType: instanceType,
  1322. ProviderID: provider.ParseID(providerID),
  1323. }
  1324. }
  1325. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1326. }
  1327. }
  1328. func applyNodeSpot(nodeMap map[nodeKey]*nodePricing, resNodeIsSpot []*prom.QueryResult) {
  1329. for _, res := range resNodeIsSpot {
  1330. cluster, err := res.GetString(env.GetPromClusterLabel())
  1331. if err != nil {
  1332. cluster = env.GetClusterID()
  1333. }
  1334. node, err := res.GetString("node")
  1335. if err != nil {
  1336. log.Warnf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1337. continue
  1338. }
  1339. key := newNodeKey(cluster, node)
  1340. if _, ok := nodeMap[key]; !ok {
  1341. log.Warnf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1342. continue
  1343. }
  1344. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1345. }
  1346. }
  1347. func applyNodeDiscount(nodeMap map[nodeKey]*nodePricing, cm *CostModel) {
  1348. if cm == nil {
  1349. return
  1350. }
  1351. c, err := cm.Provider.GetConfig()
  1352. if err != nil {
  1353. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1354. return
  1355. }
  1356. discount, err := ParsePercentString(c.Discount)
  1357. if err != nil {
  1358. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1359. return
  1360. }
  1361. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1362. if err != nil {
  1363. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1364. return
  1365. }
  1366. for _, node := range nodeMap {
  1367. // TODO GKE Reserved Instances into account
  1368. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1369. node.CostPerCPUHr *= (1.0 - node.Discount)
  1370. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1371. }
  1372. }
  1373. func (cm *CostModel) applyNodesToPod(podMap map[podKey]*pod, nodeMap map[nodeKey]*nodePricing) {
  1374. for _, pod := range podMap {
  1375. for _, alloc := range pod.Allocations {
  1376. cluster := alloc.Properties.Cluster
  1377. nodeName := alloc.Properties.Node
  1378. thisNodeKey := newNodeKey(cluster, nodeName)
  1379. node := cm.getNodePricing(nodeMap, thisNodeKey)
  1380. alloc.Properties.ProviderID = node.ProviderID
  1381. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  1382. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  1383. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  1384. }
  1385. }
  1386. }
  1387. // getCustomNodePricing converts the CostModel's configured custom pricing
  1388. // values into a nodePricing instance.
  1389. func (cm *CostModel) getCustomNodePricing(spot bool, providerID string) *nodePricing {
  1390. customPricingConfig, err := cm.Provider.GetConfig()
  1391. if err != nil {
  1392. return nil
  1393. }
  1394. cpuCostStr := customPricingConfig.CPU
  1395. gpuCostStr := customPricingConfig.GPU
  1396. ramCostStr := customPricingConfig.RAM
  1397. if spot {
  1398. cpuCostStr = customPricingConfig.SpotCPU
  1399. gpuCostStr = customPricingConfig.SpotGPU
  1400. ramCostStr = customPricingConfig.SpotRAM
  1401. }
  1402. node := &nodePricing{
  1403. Source: "custom",
  1404. ProviderID: providerID,
  1405. }
  1406. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1407. if err != nil {
  1408. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1409. }
  1410. node.CostPerCPUHr = costPerCPUHr
  1411. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1412. if err != nil {
  1413. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1414. }
  1415. node.CostPerGPUHr = costPerGPUHr
  1416. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1417. if err != nil {
  1418. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1419. }
  1420. node.CostPerRAMGiBHr = costPerRAMHr
  1421. return node
  1422. }
  1423. // getNodePricing determines node pricing, given a key and a mapping from keys
  1424. // to their nodePricing instances, as well as the custom pricing configuration
  1425. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1426. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1427. // back on custom pricing as a default.
  1428. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey nodeKey) *nodePricing {
  1429. // Find the relevant nodePricing, if it exists. If not, substitute the
  1430. // custom nodePricing as a default.
  1431. node, ok := nodeMap[nodeKey]
  1432. if !ok || node == nil {
  1433. if nodeKey.Node != "" {
  1434. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1435. }
  1436. // since the node pricing data is not found, and this won't change for the duration of the allocation
  1437. // build process, we can update the node map with the defaults to prevent future failed lookups
  1438. nodeMap[nodeKey] = cm.getCustomNodePricing(false, "")
  1439. return nodeMap[nodeKey]
  1440. }
  1441. // If custom pricing is enabled and can be retrieved, override detected
  1442. // node pricing with the custom values.
  1443. customPricingConfig, err := cm.Provider.GetConfig()
  1444. if err != nil {
  1445. log.Warnf("CostModel: failed to load custom pricing: %s", err)
  1446. }
  1447. if provider.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1448. return cm.getCustomNodePricing(node.Preemptible, node.ProviderID)
  1449. }
  1450. node.Source = "prometheus"
  1451. // If any of the values are NaN or zero, replace them with the custom
  1452. // values as default.
  1453. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1454. // them as strings like this?
  1455. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1456. log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1457. cpuCostStr := customPricingConfig.CPU
  1458. if node.Preemptible {
  1459. cpuCostStr = customPricingConfig.SpotCPU
  1460. }
  1461. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1462. if err != nil {
  1463. log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1464. }
  1465. node.CostPerCPUHr = costPerCPUHr
  1466. node.Source += "/customCPU"
  1467. }
  1468. if math.IsNaN(node.CostPerGPUHr) {
  1469. log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1470. gpuCostStr := customPricingConfig.GPU
  1471. if node.Preemptible {
  1472. gpuCostStr = customPricingConfig.SpotGPU
  1473. }
  1474. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1475. if err != nil {
  1476. log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1477. }
  1478. node.CostPerGPUHr = costPerGPUHr
  1479. node.Source += "/customGPU"
  1480. }
  1481. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1482. log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1483. ramCostStr := customPricingConfig.RAM
  1484. if node.Preemptible {
  1485. ramCostStr = customPricingConfig.SpotRAM
  1486. }
  1487. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1488. if err != nil {
  1489. log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1490. }
  1491. node.CostPerRAMGiBHr = costPerRAMHr
  1492. node.Source += "/customRAM"
  1493. }
  1494. // Double check each for NaNs, as there is a chance that our custom pricing
  1495. // config could, itself, contain NaNs...
  1496. if math.IsNaN(node.CostPerCPUHr) || math.IsInf(node.CostPerCPUHr, 0) {
  1497. log.Warnf("CostModel: %s: node pricing has illegal CPU value: %v (setting to 0.0)", nodeKey, node.CostPerCPUHr)
  1498. node.CostPerCPUHr = 0.0
  1499. }
  1500. if math.IsNaN(node.CostPerGPUHr) || math.IsInf(node.CostPerGPUHr, 0) {
  1501. log.Warnf("CostModel: %s: node pricing has illegal RAM value: %v (setting to 0.0)", nodeKey, node.CostPerGPUHr)
  1502. node.CostPerGPUHr = 0.0
  1503. }
  1504. if math.IsNaN(node.CostPerRAMGiBHr) || math.IsInf(node.CostPerRAMGiBHr, 0) {
  1505. log.Warnf("CostModel: %s: node pricing has illegal RAM value: %v (setting to 0.0)", nodeKey, node.CostPerRAMGiBHr)
  1506. node.CostPerRAMGiBHr = 0.0
  1507. }
  1508. return node
  1509. }
  1510. /* PV/PVC Helpers */
  1511. func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHour, resPVActiveMins, resPVMeta []*prom.QueryResult, window kubecost.Window) {
  1512. for _, result := range resPVActiveMins {
  1513. key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
  1514. if err != nil {
  1515. log.Warnf("CostModel.ComputeAllocation: pv bytes query result missing field: %s", err)
  1516. continue
  1517. }
  1518. pvStart, pvEnd := calculateStartAndEnd(result, resolution, window)
  1519. if pvStart.IsZero() || pvEnd.IsZero() {
  1520. log.Warnf("CostModel.ComputeAllocation: pv %s has no running time", key)
  1521. }
  1522. pvMap[key] = &pv{
  1523. Cluster: key.Cluster,
  1524. Name: key.PersistentVolume,
  1525. Start: pvStart,
  1526. End: pvEnd,
  1527. }
  1528. }
  1529. for _, result := range resPVCostPerGiBHour {
  1530. key, err := resultPVKey(result, env.GetPromClusterLabel(), "volumename")
  1531. if err != nil {
  1532. log.Warnf("CostModel.ComputeAllocation: thisPV bytes query result missing field: %s", err)
  1533. continue
  1534. }
  1535. if _, ok := pvMap[key]; !ok {
  1536. pvMap[key] = &pv{
  1537. Cluster: key.Cluster,
  1538. Name: key.PersistentVolume,
  1539. }
  1540. }
  1541. pvMap[key].CostPerGiBHour = result.Values[0].Value
  1542. }
  1543. for _, result := range resPVMeta {
  1544. key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
  1545. if err != nil {
  1546. log.Warnf("error getting key for PV: %v", err)
  1547. continue
  1548. }
  1549. // only add metadata for disks that exist in the other metrics
  1550. if _, ok := pvMap[key]; ok {
  1551. provId, err := result.GetString("provider_id")
  1552. if err != nil {
  1553. log.Warnf("error getting provider id for PV %v: %v", key, err)
  1554. continue
  1555. }
  1556. pvMap[key].ProviderID = provId
  1557. }
  1558. }
  1559. }
  1560. func applyPVBytes(pvMap map[pvKey]*pv, resPVBytes []*prom.QueryResult) {
  1561. for _, res := range resPVBytes {
  1562. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1563. if err != nil {
  1564. log.Warnf("CostModel.ComputeAllocation: pv bytes query result missing field: %s", err)
  1565. continue
  1566. }
  1567. if _, ok := pvMap[key]; !ok {
  1568. log.Warnf("CostModel.ComputeAllocation: pv bytes result for missing pv: %s", err)
  1569. continue
  1570. }
  1571. pvBytesUsed := res.Values[0].Value
  1572. if pvBytesUsed < PV_USAGE_SANITY_LIMIT_BYTES {
  1573. pvMap[key].Bytes = pvBytesUsed
  1574. } else {
  1575. pvMap[key].Bytes = 0
  1576. log.Warnf("PV usage exceeds sanity limit, clamping to zero")
  1577. }
  1578. }
  1579. }
  1580. func buildPVCMap(resolution time.Duration, pvcMap map[pvcKey]*pvc, pvMap map[pvKey]*pv, resPVCInfo []*prom.QueryResult, window kubecost.Window) {
  1581. for _, res := range resPVCInfo {
  1582. cluster, err := res.GetString(env.GetPromClusterLabel())
  1583. if err != nil {
  1584. cluster = env.GetClusterID()
  1585. }
  1586. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1587. if err != nil {
  1588. log.DedupedWarningf(10, "CostModel.ComputeAllocation: pvc info query result missing field: %s", err)
  1589. continue
  1590. }
  1591. namespace := values["namespace"]
  1592. name := values["persistentvolumeclaim"]
  1593. volume := values["volumename"]
  1594. storageClass := values["storageclass"]
  1595. pvKey := newPVKey(cluster, volume)
  1596. pvcKey := newPVCKey(cluster, namespace, name)
  1597. pvcStart, pvcEnd := calculateStartAndEnd(res, resolution, window)
  1598. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1599. log.Warnf("CostModel.ComputeAllocation: pvc %s has no running time", pvcKey)
  1600. }
  1601. if _, ok := pvMap[pvKey]; !ok {
  1602. continue
  1603. }
  1604. pvMap[pvKey].StorageClass = storageClass
  1605. if _, ok := pvcMap[pvcKey]; !ok {
  1606. pvcMap[pvcKey] = &pvc{}
  1607. }
  1608. pvcMap[pvcKey].Name = name
  1609. pvcMap[pvcKey].Namespace = namespace
  1610. pvcMap[pvcKey].Cluster = cluster
  1611. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1612. pvcMap[pvcKey].Start = pvcStart
  1613. pvcMap[pvcKey].End = pvcEnd
  1614. }
  1615. }
  1616. func applyPVCBytesRequested(pvcMap map[pvcKey]*pvc, resPVCBytesRequested []*prom.QueryResult) {
  1617. for _, res := range resPVCBytesRequested {
  1618. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1619. if err != nil {
  1620. continue
  1621. }
  1622. if _, ok := pvcMap[key]; !ok {
  1623. continue
  1624. }
  1625. pvcMap[key].Bytes = res.Values[0].Value
  1626. }
  1627. }
  1628. func buildPodPVCMap(podPVCMap map[podKey][]*pvc, pvMap map[pvKey]*pv, pvcMap map[pvcKey]*pvc, podMap map[podKey]*pod, resPodPVCAllocation []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) {
  1629. for _, res := range resPodPVCAllocation {
  1630. cluster, err := res.GetString(env.GetPromClusterLabel())
  1631. if err != nil {
  1632. cluster = env.GetClusterID()
  1633. }
  1634. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1635. if err != nil {
  1636. log.DedupedWarningf(5, "CostModel.ComputeAllocation: pvc allocation query result missing field: %s", err)
  1637. continue
  1638. }
  1639. namespace := values["namespace"]
  1640. pod := values["pod"]
  1641. name := values["persistentvolumeclaim"]
  1642. volume := values["persistentvolume"]
  1643. key := newPodKey(cluster, namespace, pod)
  1644. pvKey := newPVKey(cluster, volume)
  1645. pvcKey := newPVCKey(cluster, namespace, name)
  1646. var keys []podKey
  1647. if ingestPodUID {
  1648. if uidKeys, ok := podUIDKeyMap[key]; ok {
  1649. keys = append(keys, uidKeys...)
  1650. }
  1651. } else {
  1652. keys = []podKey{key}
  1653. }
  1654. for _, key := range keys {
  1655. if _, ok := pvMap[pvKey]; !ok {
  1656. log.DedupedWarningf(5, "CostModel.ComputeAllocation: pv missing for pvc allocation query result: %s", pvKey)
  1657. continue
  1658. }
  1659. if _, ok := podPVCMap[key]; !ok {
  1660. podPVCMap[key] = []*pvc{}
  1661. }
  1662. pvc, ok := pvcMap[pvcKey]
  1663. if !ok {
  1664. log.DedupedWarningf(5, "CostModel.ComputeAllocation: pvc missing for pvc allocation query: %s", pvcKey)
  1665. continue
  1666. }
  1667. if pod, ok := podMap[key]; !ok || len(pod.Allocations) <= 0 {
  1668. log.DedupedWarningf(10, "CostModel.ComputeAllocation: pvc %s for missing pod %s", pvcKey, key)
  1669. continue
  1670. }
  1671. pvc.Mounted = true
  1672. podPVCMap[key] = append(podPVCMap[key], pvc)
  1673. }
  1674. }
  1675. }
  1676. func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap map[podKey][]*pvc, pvcMap map[pvcKey]*pvc) {
  1677. // Because PVCs can be shared among pods, the respective pv cost
  1678. // needs to be evenly distributed to those pods based on time
  1679. // running, as well as the amount of time the pvc was shared.
  1680. // Build a relation between every pvc to the pods that mount it
  1681. // and a window representing the interval during which they
  1682. // were associated.
  1683. pvcPodWindowMap := make(map[pvcKey]map[podKey]kubecost.Window)
  1684. for thisPodKey, thisPod := range podMap {
  1685. if pvcs, ok := podPVCMap[thisPodKey]; ok {
  1686. for _, thisPVC := range pvcs {
  1687. // Determine the (start, end) of the relationship between the
  1688. // given pvc and the associated Allocation so that a precise
  1689. // number of hours can be used to compute cumulative cost.
  1690. s, e := thisPod.Start, thisPod.End
  1691. if thisPVC.Start.After(thisPod.Start) {
  1692. s = thisPVC.Start
  1693. }
  1694. if thisPVC.End.Before(thisPod.End) {
  1695. e = thisPVC.End
  1696. }
  1697. thisPVCKey := thisPVC.key()
  1698. if pvcPodWindowMap[thisPVCKey] == nil {
  1699. pvcPodWindowMap[thisPVCKey] = make(map[podKey]kubecost.Window)
  1700. }
  1701. pvcPodWindowMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
  1702. }
  1703. }
  1704. }
  1705. for thisPVCKey, podWindowMap := range pvcPodWindowMap {
  1706. // Build out a pv price coefficient for each pod with a pvc. Each
  1707. // pvc-pod relation needs a coefficient which modifies the pv cost
  1708. // such that pv costs can be shared between all pods using that pvc.
  1709. // Get single-point intervals from alloc-pvc relation windows.
  1710. intervals := getIntervalPointsFromWindows(podWindowMap)
  1711. pvc, ok := pvcMap[thisPVCKey]
  1712. if !ok {
  1713. log.Warnf("Allocation: Compute: applyPVCsToPods: missing pvc with key %s", thisPVCKey)
  1714. continue
  1715. }
  1716. if pvc == nil {
  1717. log.Warnf("Allocation: Compute: applyPVCsToPods: nil pvc with key %s", thisPVCKey)
  1718. continue
  1719. }
  1720. // Determine coefficients for each pvc-pod relation.
  1721. sharedPVCCostCoefficients, err := getPVCCostCoefficients(intervals, pvc)
  1722. if err != nil {
  1723. log.Warnf("Allocation: Compute: applyPVCsToPods: getPVCCostCoefficients: %s", err)
  1724. continue
  1725. }
  1726. // Distribute pvc costs to Allocations
  1727. for thisPodKey, coeffComponents := range sharedPVCCostCoefficients {
  1728. pod, ok2 := podMap[thisPodKey]
  1729. // If pod does not exist or the pod does not have any allocations
  1730. // get unmounted pod for cluster
  1731. if !ok2 || len(pod.Allocations) == 0 {
  1732. // Get namespace unmounted pod, as pvc will have a namespace
  1733. pod = getUnmountedPodForNamespace(window, podMap, pvc.Cluster, pvc.Namespace)
  1734. }
  1735. for _, alloc := range pod.Allocations {
  1736. s, e := pod.Start, pod.End
  1737. minutes := e.Sub(s).Minutes()
  1738. hrs := minutes / 60.0
  1739. gib := pvc.Bytes / 1024 / 1024 / 1024
  1740. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1741. byteHours := pvc.Bytes * hrs
  1742. coef := getCoefficientFromComponents(coeffComponents)
  1743. // Apply the size and cost of the pv to the allocation, each
  1744. // weighted by count (i.e. the number of containers in the pod)
  1745. // record the amount of total PVBytes Hours attributable to a given pv
  1746. if alloc.PVs == nil {
  1747. alloc.PVs = kubecost.PVAllocations{}
  1748. }
  1749. pvKey := kubecost.PVKey{
  1750. Cluster: pvc.Volume.Cluster,
  1751. Name: pvc.Volume.Name,
  1752. }
  1753. // Both Cost and byteHours should be multiplied by the coef and divided by count
  1754. // so that if all allocations with a given pv key are summed the result of those
  1755. // would be equal to the values of the original pv
  1756. count := float64(len(pod.Allocations))
  1757. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  1758. ByteHours: byteHours * coef / count,
  1759. Cost: cost * coef / count,
  1760. ProviderID: pvc.Volume.ProviderID,
  1761. }
  1762. }
  1763. }
  1764. }
  1765. }
  1766. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*pod, pvMap map[pvKey]*pv, pvcMap map[pvcKey]*pvc) {
  1767. for _, pv := range pvMap {
  1768. mounted := false
  1769. for _, pvc := range pvcMap {
  1770. if pvc.Volume == nil {
  1771. continue
  1772. }
  1773. if pvc.Volume == pv {
  1774. mounted = true
  1775. break
  1776. }
  1777. }
  1778. if !mounted {
  1779. // a pv without a pvc will not have a namespace, so get the cluster unmounted pod
  1780. pod := getUnmountedPodForCluster(window, podMap, pv.Cluster)
  1781. // Calculate pv Cost
  1782. // Unmounted pv should have correct keyso it can still reconcile
  1783. thisPVKey := kubecost.PVKey{
  1784. Cluster: pv.Cluster,
  1785. Name: pv.Name,
  1786. }
  1787. gib := pv.Bytes / 1024 / 1024 / 1024
  1788. hrs := pv.minutes() / 60.0
  1789. cost := pv.CostPerGiBHour * gib * hrs
  1790. unmountedPVs := kubecost.PVAllocations{
  1791. thisPVKey: {
  1792. ByteHours: pv.Bytes * hrs,
  1793. Cost: cost,
  1794. },
  1795. }
  1796. pod.Allocations[kubecost.UnmountedSuffix].PVs = pod.Allocations[kubecost.UnmountedSuffix].PVs.Add(unmountedPVs)
  1797. }
  1798. }
  1799. }
  1800. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*pod, pvcMap map[pvcKey]*pvc) {
  1801. for _, pvc := range pvcMap {
  1802. if !pvc.Mounted && pvc.Volume != nil {
  1803. // Get namespace unmounted pod, as pvc will have a namespace
  1804. pod := getUnmountedPodForNamespace(window, podMap, pvc.Cluster, pvc.Namespace)
  1805. // Calculate pv Cost
  1806. // Unmounted pv should have correct key so it can still reconcile
  1807. thisPVKey := kubecost.PVKey{
  1808. Cluster: pvc.Volume.Cluster,
  1809. Name: pvc.Volume.Name,
  1810. }
  1811. // Use the Volume Bytes here because pvc bytes could be different,
  1812. // however the pv bytes are what are going to determine cost
  1813. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1814. hrs := pvc.Volume.minutes() / 60.0
  1815. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1816. unmountedPVs := kubecost.PVAllocations{
  1817. thisPVKey: {
  1818. ByteHours: pvc.Volume.Bytes * hrs,
  1819. Cost: cost,
  1820. },
  1821. }
  1822. pod.Allocations[kubecost.UnmountedSuffix].PVs = pod.Allocations[kubecost.UnmountedSuffix].PVs.Add(unmountedPVs)
  1823. }
  1824. }
  1825. }
  1826. /* Helper Helpers */
  1827. // getUnmountedPodForCluster retrieve the unmounted pod for a cluster and create it if it does not exist
  1828. func getUnmountedPodForCluster(window kubecost.Window, podMap map[podKey]*pod, cluster string) *pod {
  1829. container := kubecost.UnmountedSuffix
  1830. podName := kubecost.UnmountedSuffix
  1831. namespace := kubecost.UnmountedSuffix
  1832. node := ""
  1833. thisPodKey := getUnmountedPodKey(cluster)
  1834. // Initialize pod and container if they do not already exist
  1835. thisPod, ok := podMap[thisPodKey]
  1836. if !ok {
  1837. thisPod = &pod{
  1838. Window: window.Clone(),
  1839. Start: *window.Start(),
  1840. End: *window.End(),
  1841. Key: thisPodKey,
  1842. Allocations: map[string]*kubecost.Allocation{},
  1843. }
  1844. thisPod.appendContainer(container)
  1845. thisPod.Allocations[container].Properties.Cluster = cluster
  1846. thisPod.Allocations[container].Properties.Node = node
  1847. thisPod.Allocations[container].Properties.Namespace = namespace
  1848. thisPod.Allocations[container].Properties.Pod = podName
  1849. thisPod.Allocations[container].Properties.Container = container
  1850. thisPod.Node = node
  1851. podMap[thisPodKey] = thisPod
  1852. }
  1853. return thisPod
  1854. }
  1855. // getUnmountedPodForNamespace is as getUnmountedPodForCluster, but keys allocation property pod/namespace field off namespace
  1856. // This creates or adds allocations to an unmounted pod in the specified namespace, rather than in __unmounted__
  1857. func getUnmountedPodForNamespace(window kubecost.Window, podMap map[podKey]*pod, cluster string, namespace string) *pod {
  1858. container := kubecost.UnmountedSuffix
  1859. podName := fmt.Sprintf("%s-unmounted-pvcs", namespace)
  1860. node := ""
  1861. thisPodKey := newPodKey(cluster, namespace, podName)
  1862. // Initialize pod and container if they do not already exist
  1863. thisPod, ok := podMap[thisPodKey]
  1864. if !ok {
  1865. thisPod = &pod{
  1866. Window: window.Clone(),
  1867. Start: *window.Start(),
  1868. End: *window.End(),
  1869. Key: thisPodKey,
  1870. Allocations: map[string]*kubecost.Allocation{},
  1871. }
  1872. thisPod.appendContainer(container)
  1873. thisPod.Allocations[container].Properties.Cluster = cluster
  1874. thisPod.Allocations[container].Properties.Node = node
  1875. thisPod.Allocations[container].Properties.Namespace = namespace
  1876. thisPod.Allocations[container].Properties.Pod = podName
  1877. thisPod.Allocations[container].Properties.Container = container
  1878. thisPod.Node = node
  1879. podMap[thisPodKey] = thisPod
  1880. }
  1881. return thisPod
  1882. }
  1883. func calculateStartAndEnd(result *prom.QueryResult, resolution time.Duration, window kubecost.Window) (time.Time, time.Time) {
  1884. // Start and end for a range vector are pulled from the timestamps of the
  1885. // first and final values in the range. There is no "offsetting" required
  1886. // of the start or the end, as we used to do. If you query for a duration
  1887. // of time that is divisible by the given resolution, and set the end time
  1888. // to be precisely the end of the window, Prometheus should give all the
  1889. // relevant timestamps.
  1890. //
  1891. // E.g. avg(kube_pod_container_status_running{}) by (pod, namespace)[1h:1m]
  1892. // with time=01:00:00 will return, for a pod running the entire time,
  1893. // 61 timestamps where the first is 00:00:00 and the last is 01:00:00.
  1894. s := time.Unix(int64(result.Values[0].Timestamp), 0).UTC()
  1895. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).UTC()
  1896. // The only corner-case here is what to do if you only get one timestamp.
  1897. // This dilemma still requires the use of the resolution, and can be
  1898. // clamped using the window. In this case, we want to honor the existence
  1899. // of the pod by giving "one resolution" worth of duration, half on each
  1900. // side of the given timestamp.
  1901. if s.Equal(e) {
  1902. s = s.Add(-1 * resolution / time.Duration(2))
  1903. e = e.Add(resolution / time.Duration(2))
  1904. }
  1905. if s.Before(*window.Start()) {
  1906. s = *window.Start()
  1907. }
  1908. if e.After(*window.End()) {
  1909. e = *window.End()
  1910. }
  1911. return s, e
  1912. }