metricsquerier.go 134 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839
  1. package prom
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. "github.com/opencost/opencost/core/pkg/util/timeutil"
  8. prometheus "github.com/prometheus/client_golang/api"
  9. )
  10. //--------------------------------------------------------------------------
  11. // PrometheusMetricsQuerier
  12. //--------------------------------------------------------------------------
  13. // PrometheusMetricsQueryLogFormat is the log format used to log metric queries before being sent to the prometheus
  14. // instance
  15. const PrometheusMetricsQueryLogFormat = `[PrometheusMetricsQuerier][%s][At Time: %d]: %s`
  16. // PrometheusMetricsQuerier is the implementation of the data source's MetricsQuerier interface for Prometheus.
  17. type PrometheusMetricsQuerier struct {
  18. promConfig *OpenCostPrometheusConfig
  19. promClient prometheus.Client
  20. promContexts *ContextFactory
  21. }
  22. func newPrometheusMetricsQuerier(
  23. promConfig *OpenCostPrometheusConfig,
  24. promClient prometheus.Client,
  25. promContexts *ContextFactory,
  26. ) *PrometheusMetricsQuerier {
  27. return &PrometheusMetricsQuerier{
  28. promConfig: promConfig,
  29. promClient: promClient,
  30. promContexts: promContexts,
  31. }
  32. }
  33. func (pds *PrometheusMetricsQuerier) QueryPVPricePerGiBHour(start, end time.Time) *source.Future[source.PVPricePerGiBHourResult] {
  34. const queryName = "QueryPVPricePerGiBHour"
  35. const pvCostQuery = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume, volumename, uid, provider_id)`
  36. cfg := pds.promConfig
  37. durStr := timeutil.DurationString(end.Sub(start))
  38. if durStr == "" {
  39. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  40. }
  41. queryPVCost := fmt.Sprintf(pvCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  42. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVCost)
  43. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  44. return source.NewFuture(source.DecodePVPricePerGiBHourResult, ctx.QueryAtTime(queryPVCost, end))
  45. }
  46. func (pds *PrometheusMetricsQuerier) QueryPVUsedAverage(start, end time.Time) *source.Future[source.PVUsedAvgResult] {
  47. const queryName = "QueryPVUsedAverage"
  48. const pvUsedAverageQuery = `avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace, uid)`
  49. cfg := pds.promConfig
  50. durStr := timeutil.DurationString(end.Sub(start))
  51. if durStr == "" {
  52. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  53. }
  54. queryPVUsedAvg := fmt.Sprintf(pvUsedAverageQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  55. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVUsedAvg)
  56. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  57. return source.NewFuture(source.DecodePVUsedAvgResult, ctx.QueryAtTime(queryPVUsedAvg, end))
  58. }
  59. func (pds *PrometheusMetricsQuerier) QueryPVUsedMax(start, end time.Time) *source.Future[source.PVUsedMaxResult] {
  60. const queryName = "QueryPVUsedMax"
  61. const pvUsedMaxQuery = `max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace, uid)`
  62. cfg := pds.promConfig
  63. durStr := timeutil.DurationString(end.Sub(start))
  64. if durStr == "" {
  65. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  66. }
  67. queryPVUsedMax := fmt.Sprintf(pvUsedMaxQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  68. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVUsedMax)
  69. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  70. return source.NewFuture(source.DecodePVUsedMaxResult, ctx.QueryAtTime(queryPVUsedMax, end))
  71. }
  72. func (pds *PrometheusMetricsQuerier) QueryPVCUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  73. const queryName = "QueryPVCUptime"
  74. const queryFmtPVCUptime = `avg(kube_persistentvolumeclaim_info{%s}) by (%s, uid)[%s:%dm]`
  75. cfg := pds.promConfig
  76. minsPerResolution := cfg.DataResolutionMinutes
  77. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  78. if durStr == "" {
  79. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  80. }
  81. queryPVCUptime := fmt.Sprintf(queryFmtPVCUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  82. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVCUptime)
  83. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  84. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryPVCUptime, end))
  85. }
  86. func (pds *PrometheusMetricsQuerier) QueryPVCInfo(start, end time.Time) *source.Future[source.PVCInfoResult] {
  87. const queryName = "QueryPVCInfo"
  88. const queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, uid, %s)[%s:%dm]`
  89. cfg := pds.promConfig
  90. minsPerResolution := cfg.DataResolutionMinutes
  91. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  92. if durStr == "" {
  93. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  94. }
  95. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  96. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVCInfo)
  97. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  98. return source.NewFuture(source.DecodePVCInfoResult, ctx.QueryAtTime(queryPVCInfo, end))
  99. }
  100. func (pds *PrometheusMetricsQuerier) QueryPVActiveMinutes(start, end time.Time) *source.Future[source.PVActiveMinutesResult] {
  101. const queryName = "QueryPVActiveMinutes"
  102. const pvActiveMinsQuery = `avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume, uid)[%s:%dm]`
  103. cfg := pds.promConfig
  104. minsPerResolution := cfg.DataResolutionMinutes
  105. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  106. if durStr == "" {
  107. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  108. }
  109. queryPVActiveMins := fmt.Sprintf(pvActiveMinsQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  110. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVActiveMins)
  111. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  112. return source.NewFuture(source.DecodePVActiveMinutesResult, ctx.QueryAtTime(queryPVActiveMins, end))
  113. }
  114. func (pds *PrometheusMetricsQuerier) QueryLocalStorageUsedAvg(start, end time.Time) *source.Future[source.LocalStorageUsedAvgResult] {
  115. const queryName = "QueryLocalStorageUsedAvg"
  116. const localStorageUsedAvgQuery = `avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  117. cfg := pds.promConfig
  118. durStr := timeutil.DurationString(end.Sub(start))
  119. if durStr == "" {
  120. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  121. }
  122. queryLocalStorageUsedAvg := fmt.Sprintf(localStorageUsedAvgQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  123. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLocalStorageUsedAvg)
  124. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  125. return source.NewFuture(source.DecodeLocalStorageUsedAvgResult, ctx.QueryAtTime(queryLocalStorageUsedAvg, end))
  126. }
  127. func (pds *PrometheusMetricsQuerier) QueryLocalStorageUsedMax(start, end time.Time) *source.Future[source.LocalStorageUsedMaxResult] {
  128. const queryName = "QueryLocalStorageUsedMax"
  129. const localStorageUsedMaxQuery = `max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  130. cfg := pds.promConfig
  131. durStr := timeutil.DurationString(end.Sub(start))
  132. if durStr == "" {
  133. panic("failed to parse duration string passed to QueryLocalStorageUsedMax")
  134. }
  135. queryLocalStorageUsedMax := fmt.Sprintf(localStorageUsedMaxQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  136. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLocalStorageUsedMax)
  137. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  138. return source.NewFuture(source.DecodeLocalStorageUsedMaxResult, ctx.QueryAtTime(queryLocalStorageUsedMax, end))
  139. }
  140. func (pds *PrometheusMetricsQuerier) QueryLocalStorageBytes(start, end time.Time) *source.Future[source.LocalStorageBytesResult] {
  141. const queryName = "QueryLocalStorageBytes"
  142. const localStorageBytesQuery = `avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`
  143. cfg := pds.promConfig
  144. minsPerResolution := cfg.DataResolutionMinutes
  145. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  146. if durStr == "" {
  147. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  148. }
  149. queryLocalStorageBytes := fmt.Sprintf(localStorageBytesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  150. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLocalStorageBytes)
  151. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  152. return source.NewFuture(source.DecodeLocalStorageBytesResult, ctx.QueryAtTime(queryLocalStorageBytes, end))
  153. }
  154. func (pds *PrometheusMetricsQuerier) QueryLocalStorageActiveMinutes(start, end time.Time) *source.Future[source.LocalStorageActiveMinutesResult] {
  155. const queryName = "QueryLocalStorageActiveMinutes"
  156. const localStorageActiveMinutesQuery = `count(node_total_hourly_cost{%s}) by (%s, node, uid, instance, provider_id)[%s:%dm]`
  157. cfg := pds.promConfig
  158. minsPerResolution := cfg.DataResolutionMinutes
  159. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  160. if durStr == "" {
  161. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  162. }
  163. queryLocalStorageActiveMins := fmt.Sprintf(localStorageActiveMinutesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  164. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLocalStorageActiveMins)
  165. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  166. return source.NewFuture(source.DecodeLocalStorageActiveMinutesResult, ctx.QueryAtTime(queryLocalStorageActiveMins, end))
  167. }
  168. func (pds *PrometheusMetricsQuerier) QueryNodeInfo(start, end time.Time) *source.Future[source.NodeInfoResult] {
  169. const queryName = "QueryNodeInfo"
  170. const queryFmtNodeInfo = `avg(avg_over_time(node_info{%s}[%s])) by (%s, node, uid, provider_id, instance_type)`
  171. cfg := pds.promConfig
  172. durStr := timeutil.DurationString(end.Sub(start))
  173. if durStr == "" {
  174. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  175. }
  176. queryNodeInfo := fmt.Sprintf(queryFmtNodeInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  177. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeInfo)
  178. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  179. return source.NewFuture(source.DecodeNodeInfoResult, ctx.QueryAtTime(queryNodeInfo, end))
  180. }
  181. func (pds *PrometheusMetricsQuerier) QueryNodeUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  182. const queryName = "QueryNodeUptime"
  183. const queryFmtNodeUptime = `avg(node_info{%s}) by (%s, uid)[%s:%dm]`
  184. cfg := pds.promConfig
  185. minsPerResolution := cfg.DataResolutionMinutes
  186. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  187. if durStr == "" {
  188. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  189. }
  190. queryNodeUptime := fmt.Sprintf(queryFmtNodeUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  191. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeUptime)
  192. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  193. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryNodeUptime, end))
  194. }
  195. func (pds *PrometheusMetricsQuerier) QueryNodeCPUCoresCapacity(start, end time.Time) *source.Future[source.NodeCPUCoresCapacityResult] {
  196. const queryName = "QueryNodeCPUCoresCapacity"
  197. const nodeCPUCoresCapacityQuery = `avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node, uid)`
  198. cfg := pds.promConfig
  199. durStr := timeutil.DurationString(end.Sub(start))
  200. if durStr == "" {
  201. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  202. }
  203. queryNodeCPUCoresCapacity := fmt.Sprintf(nodeCPUCoresCapacityQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  204. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeCPUCoresCapacity)
  205. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  206. return source.NewFuture(source.DecodeNodeCPUCoresCapacityResult, ctx.QueryAtTime(queryNodeCPUCoresCapacity, end))
  207. }
  208. func (pds *PrometheusMetricsQuerier) QueryNodeCPUCoresAllocatable(start, end time.Time) *source.Future[source.NodeCPUCoresAllocatableResult] {
  209. const queryName = "QueryNodeCPUCoresAllocatable"
  210. const nodeCPUCoresAllocatableQuery = `avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node, uid)`
  211. // `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  212. cfg := pds.promConfig
  213. durStr := timeutil.DurationString(end.Sub(start))
  214. if durStr == "" {
  215. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  216. }
  217. queryNodeCPUCoresAllocatable := fmt.Sprintf(nodeCPUCoresAllocatableQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  218. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeCPUCoresAllocatable)
  219. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  220. return source.NewFuture(source.DecodeNodeCPUCoresAllocatableResult, ctx.QueryAtTime(queryNodeCPUCoresAllocatable, end))
  221. }
  222. func (pds *PrometheusMetricsQuerier) QueryNodeRAMBytesCapacity(start, end time.Time) *source.Future[source.NodeRAMBytesCapacityResult] {
  223. const queryName = "QueryNodeRAMBytesCapacity"
  224. const nodeRAMBytesCapacityQuery = `avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node, uid)`
  225. cfg := pds.promConfig
  226. durStr := timeutil.DurationString(end.Sub(start))
  227. if durStr == "" {
  228. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  229. }
  230. queryNodeRAMBytesCapacity := fmt.Sprintf(nodeRAMBytesCapacityQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  231. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeRAMBytesCapacity)
  232. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  233. return source.NewFuture(source.DecodeNodeRAMBytesCapacityResult, ctx.QueryAtTime(queryNodeRAMBytesCapacity, end))
  234. }
  235. func (pds *PrometheusMetricsQuerier) QueryNodeRAMBytesAllocatable(start, end time.Time) *source.Future[source.NodeRAMBytesAllocatableResult] {
  236. const queryName = "QueryNodeRAMBytesAllocatable"
  237. const nodeRAMBytesAllocatableQuery = `avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node, uid)`
  238. cfg := pds.promConfig
  239. durStr := timeutil.DurationString(end.Sub(start))
  240. if durStr == "" {
  241. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  242. }
  243. queryNodeRAMBytesAllocatable := fmt.Sprintf(nodeRAMBytesAllocatableQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  244. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeRAMBytesAllocatable)
  245. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  246. return source.NewFuture(source.DecodeNodeRAMBytesAllocatableResult, ctx.QueryAtTime(queryNodeRAMBytesAllocatable, end))
  247. }
  248. func (pds *PrometheusMetricsQuerier) QueryNodeGPUCount(start, end time.Time) *source.Future[source.NodeGPUCountResult] {
  249. const queryName = "QueryNodeGPUCount"
  250. const nodeGPUCountQuery = `avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, uid, provider_id)`
  251. cfg := pds.promConfig
  252. durStr := timeutil.DurationString(end.Sub(start))
  253. if durStr == "" {
  254. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  255. }
  256. queryNodeGPUCount := fmt.Sprintf(nodeGPUCountQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  257. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeGPUCount)
  258. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  259. return source.NewFuture(source.DecodeNodeGPUCountResult, ctx.QueryAtTime(queryNodeGPUCount, end))
  260. }
  261. func (pds *PrometheusMetricsQuerier) QueryNodeLabels(start, end time.Time) *source.Future[source.NodeLabelsResult] {
  262. const queryName = "QueryNodeLabels"
  263. const labelsQuery = `avg_over_time(kube_node_labels{%s}[%s])`
  264. cfg := pds.promConfig
  265. durStr := timeutil.DurationString(end.Sub(start))
  266. if durStr == "" {
  267. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  268. }
  269. queryLabels := fmt.Sprintf(labelsQuery, cfg.ClusterFilter, durStr)
  270. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLabels)
  271. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  272. return source.NewFuture(source.DecodeNodeLabelsResult, ctx.QueryAtTime(queryLabels, end))
  273. }
  274. func (pds *PrometheusMetricsQuerier) QueryNodeActiveMinutes(start, end time.Time) *source.Future[source.NodeActiveMinutesResult] {
  275. const queryName = "QueryNodeActiveMinutes"
  276. const activeMinsQuery = `avg(node_total_hourly_cost{%s}) by (node, uid, %s, provider_id)[%s:%dm]`
  277. cfg := pds.promConfig
  278. minsPerResolution := cfg.DataResolutionMinutes
  279. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  280. if durStr == "" {
  281. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  282. }
  283. queryActiveMins := fmt.Sprintf(activeMinsQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  284. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryActiveMins)
  285. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  286. return source.NewFuture(source.DecodeNodeActiveMinutesResult, ctx.QueryAtTime(queryActiveMins, end))
  287. }
  288. func (pds *PrometheusMetricsQuerier) QueryNodeCPUModeTotal(start, end time.Time) *source.Future[source.NodeCPUModeTotalResult] {
  289. const queryName = "QueryNodeCPUModeTotal"
  290. const nodeCPUModeTotalQuery = `sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, uid, %s, mode)`
  291. cfg := pds.promConfig
  292. minsPerResolution := cfg.DataResolutionMinutes
  293. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  294. if durStr == "" {
  295. panic("failed to parse duration string passed to QueryNodeCPUModeTotal")
  296. }
  297. queryCPUModeTotal := fmt.Sprintf(nodeCPUModeTotalQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  298. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPUModeTotal)
  299. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  300. return source.NewFuture(source.DecodeNodeCPUModeTotalResult, ctx.QueryAtTime(queryCPUModeTotal, end))
  301. }
  302. func (pds *PrometheusMetricsQuerier) QueryNodeRAMSystemPercent(start, end time.Time) *source.Future[source.NodeRAMSystemPercentResult] {
  303. const queryName = "QueryNodeRAMSystemPercent"
  304. const nodeRAMSystemPctQuery = `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, uid, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, uid, %s), "instance", "$1", "node", "(.*)")) by (instance, uid, %s)`
  305. cfg := pds.promConfig
  306. minsPerResolution := cfg.DataResolutionMinutes
  307. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  308. if durStr == "" {
  309. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  310. }
  311. queryRAMSystemPct := fmt.Sprintf(nodeRAMSystemPctQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterLabel)
  312. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMSystemPct)
  313. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  314. return source.NewFuture(source.DecodeNodeRAMSystemPercentResult, ctx.QueryAtTime(queryRAMSystemPct, end))
  315. }
  316. func (pds *PrometheusMetricsQuerier) QueryNodeRAMUserPercent(start, end time.Time) *source.Future[source.NodeRAMUserPercentResult] {
  317. const queryName = "QueryNodeRAMUserPercent"
  318. const nodeRAMUserPctQuery = `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, uid, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, uid, %s), "instance", "$1", "node", "(.*)")) by (instance, uid, %s)`
  319. cfg := pds.promConfig
  320. minsPerResolution := cfg.DataResolutionMinutes
  321. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  322. if durStr == "" {
  323. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  324. }
  325. queryRAMUserPct := fmt.Sprintf(nodeRAMUserPctQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterLabel)
  326. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMUserPct)
  327. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  328. return source.NewFuture(source.DecodeNodeRAMUserPercentResult, ctx.QueryAtTime(queryRAMUserPct, end))
  329. }
  330. func (pds *PrometheusMetricsQuerier) QueryNodeResourceCapacities(start, end time.Time) *source.Future[source.ResourceResult] {
  331. const queryName = "QueryNodeResourceCapacities"
  332. const queryFmtNodeResourceCapacities = `avg(avg_over_time(kube_node_status_capacity{%s}[%s])) by (%s, node, uid, resource, unit)`
  333. cfg := pds.promConfig
  334. durStr := timeutil.DurationString(end.Sub(start))
  335. if durStr == "" {
  336. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  337. }
  338. queryNodeResourceCapacities := fmt.Sprintf(queryFmtNodeResourceCapacities, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  339. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeResourceCapacities)
  340. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  341. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryNodeResourceCapacities, end))
  342. }
  343. func (pds *PrometheusMetricsQuerier) QueryNodeResourcesAllocatable(start, end time.Time) *source.Future[source.ResourceResult] {
  344. const queryName = "QueryNodeResourcesAllocatable"
  345. const queryFmtNodeResourcesAllocatable = `avg(avg_over_time(kube_node_status_allocatable{%s}[%s])) by (%s, node, uid, resource, unit)`
  346. cfg := pds.promConfig
  347. durStr := timeutil.DurationString(end.Sub(start))
  348. if durStr == "" {
  349. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  350. }
  351. queryNodeResourcesAllocatable := fmt.Sprintf(queryFmtNodeResourcesAllocatable, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  352. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeResourcesAllocatable)
  353. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  354. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryNodeResourcesAllocatable, end))
  355. }
  356. func (pds *PrometheusMetricsQuerier) QueryLBPricePerHr(start, end time.Time) *source.Future[source.LBPricePerHrResult] {
  357. const queryName = "QueryLBPricePerHr"
  358. const queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, uid, %s)`
  359. cfg := pds.promConfig
  360. durStr := timeutil.DurationString(end.Sub(start))
  361. if durStr == "" {
  362. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  363. }
  364. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  365. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLBCostPerHr)
  366. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  367. return source.NewFuture(source.DecodeLBPricePerHrResult, ctx.QueryAtTime(queryLBCostPerHr, end))
  368. }
  369. func (pds *PrometheusMetricsQuerier) QueryLBActiveMinutes(start, end time.Time) *source.Future[source.LBActiveMinutesResult] {
  370. const queryName = "QueryLBActiveMinutes"
  371. const lbActiveMinutesQuery = `avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, uid, %s, ingress_ip)[%s:%dm]`
  372. cfg := pds.promConfig
  373. minsPerResolution := cfg.DataResolutionMinutes
  374. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  375. if durStr == "" {
  376. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  377. }
  378. queryLBActiveMins := fmt.Sprintf(lbActiveMinutesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  379. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryLBActiveMins)
  380. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  381. return source.NewFuture(source.DecodeLBActiveMinutesResult, ctx.QueryAtTime(queryLBActiveMins, end))
  382. }
  383. func (pds *PrometheusMetricsQuerier) QueryClusterInfo(start, end time.Time) *source.Future[source.ClusterInfoResult] {
  384. const queryName = "QueryClusterInfo"
  385. const queryFmtClusterInfo = `avg(avg_over_time(cluster_info{%s}[%s])) by (%s, uid, provider, account_id, provisioner_name, region)`
  386. cfg := pds.promConfig
  387. durStr := timeutil.DurationString(end.Sub(start))
  388. if durStr == "" {
  389. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  390. }
  391. queryClusterInfo := fmt.Sprintf(queryFmtClusterInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  392. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryClusterInfo)
  393. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  394. return source.NewFuture(source.DecodeClusterInfoResult, ctx.QueryAtTime(queryClusterInfo, end))
  395. }
  396. func (pds *PrometheusMetricsQuerier) QueryClusterUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  397. const queryName = "QueryClusterUptime"
  398. const queryFmtClusterUptime = `avg(cluster_info{%s}) by (%s, uid)[%s:%dm]`
  399. cfg := pds.promConfig
  400. minsPerResolution := cfg.DataResolutionMinutes
  401. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  402. if durStr == "" {
  403. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  404. }
  405. queryClusterUptime := fmt.Sprintf(queryFmtClusterUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  406. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryFmtClusterUptime)
  407. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  408. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryClusterUptime, end))
  409. }
  410. func (pds *PrometheusMetricsQuerier) QueryClusterManagementDuration(start, end time.Time) *source.Future[source.ClusterManagementDurationResult] {
  411. const queryName = "QueryClusterManagementDuration"
  412. const clusterManagementDurationQuery = `avg(kubecost_cluster_management_cost{%s}) by (%s, provisioner_name)[%s:%dm]`
  413. cfg := pds.promConfig
  414. minsPerResolution := cfg.DataResolutionMinutes
  415. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  416. if durStr == "" {
  417. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  418. }
  419. queryClusterManagementDuration := fmt.Sprintf(clusterManagementDurationQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  420. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryClusterManagementDuration)
  421. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  422. return source.NewFuture(source.DecodeClusterManagementDurationResult, ctx.QueryAtTime(queryClusterManagementDuration, end))
  423. }
  424. func (pds *PrometheusMetricsQuerier) QueryClusterManagementPricePerHr(start, end time.Time) *source.Future[source.ClusterManagementPricePerHrResult] {
  425. const queryName = "QueryClusterManagementPricePerHr"
  426. const clusterManagementCostQuery = `avg(avg_over_time(kubecost_cluster_management_cost{%s}[%s])) by (%s, provisioner_name)`
  427. cfg := pds.promConfig
  428. durStr := timeutil.DurationString(end.Sub(start))
  429. if durStr == "" {
  430. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  431. }
  432. queryClusterManagementCost := fmt.Sprintf(clusterManagementCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  433. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryClusterManagementCost)
  434. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  435. return source.NewFuture(source.DecodeClusterManagementPricePerHrResult, ctx.QueryAtTime(queryClusterManagementCost, end))
  436. }
  437. // AllocationMetricQuerier
  438. func (pds *PrometheusMetricsQuerier) QueryPods(start, end time.Time) *source.Future[source.PodsResult] {
  439. const queryName = "QueryPods"
  440. const queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%dm]`
  441. cfg := pds.promConfig
  442. minsPerResolution := cfg.DataResolutionMinutes
  443. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  444. if durStr == "" {
  445. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  446. }
  447. queryPods := fmt.Sprintf(queryFmtPods, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  448. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPods)
  449. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  450. return source.NewFuture(source.DecodePodsResult, ctx.QueryAtTime(queryPods, end))
  451. }
  452. func (pds *PrometheusMetricsQuerier) QueryPodsUID(start, end time.Time) *source.Future[source.PodsResult] {
  453. const queryName = "QueryPodsUID"
  454. const queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%dm]`
  455. cfg := pds.promConfig
  456. minsPerResolution := cfg.DataResolutionMinutes
  457. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  458. if durStr == "" {
  459. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  460. }
  461. queryPodsUID := fmt.Sprintf(queryFmtPodsUID, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  462. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodsUID)
  463. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  464. return source.NewFuture(source.DecodePodsResult, ctx.QueryAtTime(queryPodsUID, end))
  465. }
  466. func (pds *PrometheusMetricsQuerier) QueryPodInfo(start, end time.Time) *source.Future[source.PodInfoResult] {
  467. const queryName = "QueryPodInfo"
  468. const queryFmtPodInfo = `avg(avg_over_time(pod_info{%s}[%s])) by (%s, pod, uid, namespace_uid, node_uid)`
  469. cfg := pds.promConfig
  470. durStr := timeutil.DurationString(end.Sub(start))
  471. if durStr == "" {
  472. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  473. }
  474. queryPodInfo := fmt.Sprintf(queryFmtPodInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  475. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodInfo)
  476. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  477. return source.NewFuture(source.DecodePodInfoResult, ctx.QueryAtTime(queryPodInfo, end))
  478. }
  479. func (pds *PrometheusMetricsQuerier) QueryPodUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  480. const queryName = "QueryPodUptime"
  481. const queryFmtPodUptime = `avg(pod_info{%s}) by (%s, uid)[%s:%dm]`
  482. cfg := pds.promConfig
  483. minsPerResolution := cfg.DataResolutionMinutes
  484. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  485. if durStr == "" {
  486. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  487. }
  488. queryPodUptime := fmt.Sprintf(queryFmtPodUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  489. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodUptime)
  490. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  491. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryPodUptime, end))
  492. }
  493. func (pds *PrometheusMetricsQuerier) QueryPodOwners(start, end time.Time) *source.Future[source.OwnerResult] {
  494. const queryName = "QueryPodOwners"
  495. const queryFmtPodOwners = `avg(avg_over_time(kube_pod_owner{%s}[%s])) by (%s, uid, owner_uid, owner_kind)`
  496. cfg := pds.promConfig
  497. durStr := timeutil.DurationString(end.Sub(start))
  498. if durStr == "" {
  499. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  500. }
  501. queryPodOwners := fmt.Sprintf(queryFmtPodOwners, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  502. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodOwners)
  503. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  504. return source.NewFuture(source.DecodeOwnerResult, ctx.QueryAtTime(queryPodOwners, end))
  505. }
  506. func (pds *PrometheusMetricsQuerier) QueryPodPVCVolumes(start, end time.Time) *source.Future[source.PodPVCVolumeResult] {
  507. const queryName = "QueryPodPVCVolumes"
  508. const queryFmtPodPVCVolumes = `avg(avg_over_time(pod_pvc_volume{%s}[%s])) by (%s, uid, persistentvolumeclaim_uid, pod_volume_name)`
  509. cfg := pds.promConfig
  510. durStr := timeutil.DurationString(end.Sub(start))
  511. if durStr == "" {
  512. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  513. }
  514. queryPodPVCVolumes := fmt.Sprintf(queryFmtPodPVCVolumes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  515. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodPVCVolumes)
  516. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  517. return source.NewFuture(source.DecodePodPVCVolumeResult, ctx.QueryAtTime(queryPodPVCVolumes, end))
  518. }
  519. func (pds *PrometheusMetricsQuerier) QueryPodNetworkEgressBytes(start, end time.Time) *source.Future[source.PodNetworkBytesResult] {
  520. const queryName = "QueryPodNetworkEgressBytes"
  521. const queryFmt = `sum(increase(kubecost_pod_network_egress_bytes_total{uid!="", %s}[%s:%dm])) by (uid, service, internet, same_region, same_zone, nat_gateway, %s)`
  522. cfg := pds.promConfig
  523. minsPerResolution := cfg.DataResolutionMinutes
  524. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  525. if durStr == "" {
  526. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  527. }
  528. q := fmt.Sprintf(queryFmt, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  529. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), q)
  530. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  531. return source.NewFuture(source.DecodePodNetworkBytesResult, ctx.QueryAtTime(q, end))
  532. }
  533. func (pds *PrometheusMetricsQuerier) QueryPodNetworkIngressBytes(start, end time.Time) *source.Future[source.PodNetworkBytesResult] {
  534. const queryName = "QueryPodNetworkIngressBytes"
  535. const queryFmt = `sum(increase(kubecost_pod_network_ingress_bytes_total{uid!="", %s}[%s:%dm])) by (uid, service, internet, same_region, same_zone, nat_gateway, %s)`
  536. cfg := pds.promConfig
  537. minsPerResolution := cfg.DataResolutionMinutes
  538. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  539. if durStr == "" {
  540. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  541. }
  542. q := fmt.Sprintf(queryFmt, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  543. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), q)
  544. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  545. return source.NewFuture(source.DecodePodNetworkBytesResult, ctx.QueryAtTime(q, end))
  546. }
  547. func (pds *PrometheusMetricsQuerier) QueryContainerUptime(start, end time.Time) *source.Future[source.ContainerUptimeResult] {
  548. const queryName = "QueryContainerUptime"
  549. const queryFmtContainerUptime = `avg(kube_pod_container_status_running{container!="", %s} != 0) by (container, uid, %s)[%s:%dm]`
  550. cfg := pds.promConfig
  551. minsPerResolution := cfg.DataResolutionMinutes
  552. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  553. if durStr == "" {
  554. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  555. }
  556. queryContainerUptime := fmt.Sprintf(queryFmtContainerUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  557. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryContainerUptime)
  558. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  559. return source.NewFuture(source.DecodeContainerUptimeResult, ctx.QueryAtTime(queryContainerUptime, end))
  560. }
  561. func (pds *PrometheusMetricsQuerier) QueryContainerResourceRequests(start, end time.Time) *source.Future[source.ContainerResourceResult] {
  562. const queryName = "QueryContainerResourceRequests"
  563. const queryFmtContainerResourceRequests = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", container!="POD", node!="", %s}[%s])) by (container, uid, resource, unit, %s)`
  564. cfg := pds.promConfig
  565. durStr := timeutil.DurationString(end.Sub(start))
  566. if durStr == "" {
  567. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  568. }
  569. queryContainerResourceRequests := fmt.Sprintf(queryFmtContainerResourceRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  570. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryContainerResourceRequests)
  571. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  572. return source.NewFuture(source.DecodeContainerResourceResult, ctx.QueryAtTime(queryContainerResourceRequests, end))
  573. }
  574. func (pds *PrometheusMetricsQuerier) QueryContainerResourceLimits(start, end time.Time) *source.Future[source.ContainerResourceResult] {
  575. const queryName = "QueryContainerResourceLimits"
  576. const queryFmtContainerResourceLimits = `avg(avg_over_time(kube_pod_container_resource_limits{container!="", container!="POD", node!="", %s}[%s])) by (container, uid, resource, unit, %s)`
  577. cfg := pds.promConfig
  578. durStr := timeutil.DurationString(end.Sub(start))
  579. if durStr == "" {
  580. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  581. }
  582. queryContainerResourceLimits := fmt.Sprintf(queryFmtContainerResourceLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  583. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryContainerResourceLimits)
  584. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  585. return source.NewFuture(source.DecodeContainerResourceResult, ctx.QueryAtTime(queryContainerResourceLimits, end))
  586. }
  587. func (pds *PrometheusMetricsQuerier) QueryRAMBytesAllocated(start, end time.Time) *source.Future[source.RAMBytesAllocatedResult] {
  588. const queryName = "QueryRAMBytesAllocated"
  589. const queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s, provider_id)`
  590. cfg := pds.promConfig
  591. durStr := timeutil.DurationString(end.Sub(start))
  592. if durStr == "" {
  593. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  594. }
  595. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  596. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMBytesAllocated)
  597. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  598. return source.NewFuture(source.DecodeRAMBytesAllocatedResult, ctx.QueryAtTime(queryRAMBytesAllocated, end))
  599. }
  600. func (pds *PrometheusMetricsQuerier) QueryRAMRequests(start, end time.Time) *source.Future[source.RAMRequestsResult] {
  601. const queryName = "QueryRAMRequests"
  602. const queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s)`
  603. cfg := pds.promConfig
  604. durStr := timeutil.DurationString(end.Sub(start))
  605. if durStr == "" {
  606. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  607. }
  608. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  609. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMRequests)
  610. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  611. return source.NewFuture(source.DecodeRAMRequestsResult, ctx.QueryAtTime(queryRAMRequests, end))
  612. }
  613. func (pds *PrometheusMetricsQuerier) QueryRAMLimits(start, end time.Time) *source.Future[source.RAMLimitsResult] {
  614. const queryName = "QueryRAMLimits"
  615. const queryFmtRAMLimits = `avg(avg_over_time(kube_pod_container_resource_limits{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  616. cfg := pds.promConfig
  617. durStr := timeutil.DurationString(end.Sub(start))
  618. if durStr == "" {
  619. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  620. }
  621. queryRAMLimits := fmt.Sprintf(queryFmtRAMLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  622. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMLimits)
  623. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  624. return source.NewFuture(source.DecodeRAMLimitsResult, ctx.QueryAtTime(queryRAMLimits, end))
  625. }
  626. func (pds *PrometheusMetricsQuerier) QueryRAMUsageAvg(start, end time.Time) *source.Future[source.RAMUsageAvgResult] {
  627. const queryName = "QueryRAMUsageAvg"
  628. const queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, uid, %s)`
  629. cfg := pds.promConfig
  630. durStr := timeutil.DurationString(end.Sub(start))
  631. if durStr == "" {
  632. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  633. }
  634. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  635. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMUsageAvg)
  636. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  637. return source.NewFuture(source.DecodeRAMUsageAvgResult, ctx.QueryAtTime(queryRAMUsageAvg, end))
  638. }
  639. func (pds *PrometheusMetricsQuerier) QueryRAMUsageMax(start, end time.Time) *source.Future[source.RAMUsageMaxResult] {
  640. const queryName = "QueryRAMUsageMax"
  641. const queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, uid, %s)`
  642. cfg := pds.promConfig
  643. durStr := timeutil.DurationString(end.Sub(start))
  644. if durStr == "" {
  645. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  646. }
  647. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  648. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryRAMUsageMax)
  649. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  650. return source.NewFuture(source.DecodeRAMUsageMaxResult, ctx.QueryAtTime(queryRAMUsageMax, end))
  651. }
  652. func (pds *PrometheusMetricsQuerier) QueryCPUCoresAllocated(start, end time.Time) *source.Future[source.CPUCoresAllocatedResult] {
  653. const queryName = "QueryCPUCoresAllocated"
  654. const queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s)`
  655. cfg := pds.promConfig
  656. durStr := timeutil.DurationString(end.Sub(start))
  657. if durStr == "" {
  658. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  659. }
  660. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  661. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPUCoresAllocated)
  662. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  663. return source.NewFuture(source.DecodeCPUCoresAllocatedResult, ctx.QueryAtTime(queryCPUCoresAllocated, end))
  664. }
  665. func (pds *PrometheusMetricsQuerier) QueryCPURequests(start, end time.Time) *source.Future[source.CPURequestsResult] {
  666. const queryName = "QueryCPURequests"
  667. const queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s)`
  668. cfg := pds.promConfig
  669. durStr := timeutil.DurationString(end.Sub(start))
  670. if durStr == "" {
  671. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  672. }
  673. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  674. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPURequests)
  675. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  676. return source.NewFuture(source.DecodeCPURequestsResult, ctx.QueryAtTime(queryCPURequests, end))
  677. }
  678. func (pds *PrometheusMetricsQuerier) QueryCPULimits(start, end time.Time) *source.Future[source.CPULimitsResult] {
  679. const queryName = "QueryCPULimits"
  680. const queryFmtCPULimits = `avg(avg_over_time(kube_pod_container_resource_limits{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  681. cfg := pds.promConfig
  682. durStr := timeutil.DurationString(end.Sub(start))
  683. if durStr == "" {
  684. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  685. }
  686. queryCPULimits := fmt.Sprintf(queryFmtCPULimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  687. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPULimits)
  688. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  689. return source.NewFuture(source.DecodeCPULimitsResult, ctx.QueryAtTime(queryCPULimits, end))
  690. }
  691. func (pds *PrometheusMetricsQuerier) QueryCPUUsageAvg(start, end time.Time) *source.Future[source.CPUUsageAvgResult] {
  692. const queryName = "QueryCPUUsageAvg"
  693. const queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, uid, %s)`
  694. cfg := pds.promConfig
  695. durStr := timeutil.DurationString(end.Sub(start))
  696. if durStr == "" {
  697. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  698. }
  699. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  700. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPUUsageAvg)
  701. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  702. return source.NewFuture(source.DecodeCPUUsageAvgResult, ctx.QueryAtTime(queryCPUUsageAvg, end))
  703. }
  704. func (pds *PrometheusMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *source.Future[source.CPUUsageMaxResult] {
  705. const queryName = "QueryCPUUsageMax"
  706. // Because we use container_cpu_usage_seconds_total to calculate CPU usage
  707. // at any given "instant" of time, we need to use an irate or rate. To then
  708. // calculate a max (or any aggregation) we have to perform an aggregation
  709. // query on top of an instant-by-instant maximum. Prometheus supports this
  710. // type of query with a "subquery" [1], however it is reportedly expensive
  711. // to make such a query. By default, Kubecost's Prometheus config includes
  712. // a recording rule that keeps track of the instant-by-instant irate for CPU
  713. // usage. The metric in this query is created by that recording rule.
  714. //
  715. // [1] https://prometheus.io/blog/2019/01/28/subquery-support/
  716. //
  717. // If changing the name of the recording rule, make sure to update the
  718. // corresponding diagnostic query to avoid confusion.
  719. const queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, uid, %s)`
  720. // This is the subquery equivalent of the above recording rule query. It is
  721. // more expensive, but does not require the recording rule. It should be
  722. // used as a fallback query if the recording rule data does not exist.
  723. //
  724. // The parameter after the colon [:<thisone>] in the subquery affects the
  725. // resolution of the subquery.
  726. // The parameter after the metric ...{}[<thisone>] should be set to 2x
  727. // the resolution, to make sure the irate always has two points to query
  728. // in case the Prom scrape duration has been reduced to be equal to the
  729. // query resolution.
  730. const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%dm])[%s:%dm])) by (container, pod_name, pod, namespace, node, instance, uid, %s)`
  731. cfg := pds.promConfig
  732. durStr := timeutil.DurationString(end.Sub(start))
  733. if durStr == "" {
  734. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  735. }
  736. queryCPUUsageMaxRecordingRule := fmt.Sprintf(queryFmtCPUUsageMaxRecordingRule, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  737. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPUUsageMaxRecordingRule)
  738. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  739. resCPUUsageMaxRR := ctx.QueryAtTime(queryCPUUsageMaxRecordingRule, end)
  740. resCPUUsageMax, _ := resCPUUsageMaxRR.Await()
  741. if len(resCPUUsageMax) > 0 {
  742. return source.NewFutureFrom(source.DecodeAll(resCPUUsageMax, source.DecodeCPUUsageMaxResult))
  743. }
  744. minsPerResolution := cfg.DataResolutionMinutes
  745. durStr = pds.durationStringFor(start, end, minsPerResolution, false)
  746. if durStr == "" {
  747. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  748. }
  749. queryCPUUsageMaxSubquery := fmt.Sprintf(queryFmtCPUUsageMaxSubquery, cfg.ClusterFilter, 2*minsPerResolution, durStr, minsPerResolution, cfg.ClusterLabel)
  750. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCPUUsageMaxSubquery)
  751. return source.NewFuture(source.DecodeCPUUsageMaxResult, ctx.QueryAtTime(queryCPUUsageMaxSubquery, end))
  752. }
  753. func (pds *PrometheusMetricsQuerier) QueryGPUsRequested(start, end time.Time) *source.Future[source.GPUsRequestedResult] {
  754. const queryName = "QueryGPUsRequested"
  755. const queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s)`
  756. cfg := pds.promConfig
  757. durStr := timeutil.DurationString(end.Sub(start))
  758. if durStr == "" {
  759. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  760. }
  761. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  762. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryGPUsRequested)
  763. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  764. return source.NewFuture(source.DecodeGPUsRequestedResult, ctx.QueryAtTime(queryGPUsRequested, end))
  765. }
  766. func (pds *PrometheusMetricsQuerier) QueryGPUsUsageAvg(start, end time.Time) *source.Future[source.GPUsUsageAvgResult] {
  767. const queryName = "QueryGPUsUsageAvg"
  768. const queryFmtGPUsUsageAvg = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, pod_uid, %s)`
  769. cfg := pds.promConfig
  770. durStr := timeutil.DurationString(end.Sub(start))
  771. if durStr == "" {
  772. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  773. }
  774. queryGPUsUsageAvg := fmt.Sprintf(queryFmtGPUsUsageAvg, durStr, cfg.ClusterLabel)
  775. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryGPUsUsageAvg)
  776. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  777. return source.NewFuture(source.DecodeGPUsUsageAvgResult, ctx.QueryAtTime(queryGPUsUsageAvg, end))
  778. }
  779. func (pds *PrometheusMetricsQuerier) QueryGPUsUsageMax(start, end time.Time) *source.Future[source.GPUsUsageMaxResult] {
  780. const queryName = "QueryGPUsUsageMax"
  781. const queryFmtGPUsUsageMax = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, pod_uid, %s)`
  782. cfg := pds.promConfig
  783. durStr := timeutil.DurationString(end.Sub(start))
  784. if durStr == "" {
  785. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  786. }
  787. queryGPUsUsageMax := fmt.Sprintf(queryFmtGPUsUsageMax, durStr, cfg.ClusterLabel)
  788. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryGPUsUsageMax)
  789. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  790. return source.NewFuture(source.DecodeGPUsUsageMaxResult, ctx.QueryAtTime(queryGPUsUsageMax, end))
  791. }
  792. func (pds *PrometheusMetricsQuerier) QueryGPUsAllocated(start, end time.Time) *source.Future[source.GPUsAllocatedResult] {
  793. const queryName = "QueryGPUsAllocated"
  794. const queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, uid, %s)`
  795. cfg := pds.promConfig
  796. durStr := timeutil.DurationString(end.Sub(start))
  797. if durStr == "" {
  798. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  799. }
  800. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  801. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryGPUsAllocated)
  802. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  803. return source.NewFuture(source.DecodeGPUsAllocatedResult, ctx.QueryAtTime(queryGPUsAllocated, end))
  804. }
  805. func (pds *PrometheusMetricsQuerier) QueryIsGPUShared(start, end time.Time) *source.Future[source.IsGPUSharedResult] {
  806. const queryName = "QueryIsGPUShared"
  807. const queryFmtIsGPUShared = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", node != "", pod != "", container!= "", unit = "integer", %s}[%s])) by (container, pod, namespace, node, resource, uid, %s)`
  808. cfg := pds.promConfig
  809. durStr := timeutil.DurationString(end.Sub(start))
  810. if durStr == "" {
  811. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  812. }
  813. queryIsGPUShared := fmt.Sprintf(queryFmtIsGPUShared, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  814. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryIsGPUShared)
  815. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  816. return source.NewFuture(source.DecodeIsGPUSharedResult, ctx.QueryAtTime(queryIsGPUShared, end))
  817. }
  818. func (pds *PrometheusMetricsQuerier) QueryDCGMDeviceInfo(start, end time.Time) *source.Future[source.DCGMDeviceInfoResult] {
  819. const queryName = "QueryDCGMDeviceInfo"
  820. const queryFmtDCGMDeviceInfo = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{%s}[%s])) by (UUID, device, modelName, Hostname, %s)`
  821. cfg := pds.promConfig
  822. durStr := timeutil.DurationString(end.Sub(start))
  823. if durStr == "" {
  824. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  825. }
  826. queryDCGMDeviceInfo := fmt.Sprintf(queryFmtDCGMDeviceInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  827. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDCGMDeviceInfo)
  828. ctx := pds.promContexts.NewNamedContext(ComputeCostDataContextName)
  829. return source.NewFuture(source.DecodeDCGMDeviceInfoResult, ctx.QueryAtTime(queryDCGMDeviceInfo, end))
  830. }
  831. func (pds *PrometheusMetricsQuerier) QueryDCGMDeviceUptime(start, end time.Time) *source.Future[source.DCGMDeviceUptimeResult] {
  832. const queryName = "QueryDCGMDeviceUptime"
  833. const queryFmtDCGMDeviceUptime = `avg(DCGM_FI_DEV_DEC_UTIL{%s}) by (UUID, %s)[%s:%dm]`
  834. cfg := pds.promConfig
  835. minsPerResolution := cfg.DataResolutionMinutes
  836. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  837. if durStr == "" {
  838. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  839. }
  840. queryDCGMDeviceUptime := fmt.Sprintf(queryFmtDCGMDeviceUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  841. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDCGMDeviceUptime)
  842. ctx := pds.promContexts.NewNamedContext(ComputeCostDataContextName)
  843. return source.NewFuture(source.DecodeDCGMDeviceUptimeResult, ctx.QueryAtTime(queryDCGMDeviceUptime, end))
  844. }
  845. func (pds *PrometheusMetricsQuerier) QueryDCGMContainerUsageAvg(start, end time.Time) *source.Future[source.DCGMDeviceContainerUsageResult] {
  846. const queryName = "QueryDCGMContainerUsageAvg"
  847. const queryFmtDCGMContainerUsageAvg = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!="", %s}[%s])) by (UUID, pod_uid, container, %s)`
  848. cfg := pds.promConfig
  849. durStr := timeutil.DurationString(end.Sub(start))
  850. if durStr == "" {
  851. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  852. }
  853. queryDCGMContainerUsageAvg := fmt.Sprintf(queryFmtDCGMContainerUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  854. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDCGMContainerUsageAvg)
  855. ctx := pds.promContexts.NewNamedContext(ComputeCostDataContextName)
  856. return source.NewFuture(source.DecodeDCGMDeviceContainerUsageResult, ctx.QueryAtTime(queryDCGMContainerUsageAvg, end))
  857. }
  858. func (pds *PrometheusMetricsQuerier) QueryDCGMContainerUsageMax(start, end time.Time) *source.Future[source.DCGMDeviceContainerUsageResult] {
  859. const queryName = "QueryDCGMContainerUsageMax"
  860. const queryFmtDCGMContainerUsageMax = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!="", %s}[%s])) by (UUID, pod_uid, container, %s)`
  861. cfg := pds.promConfig
  862. durStr := timeutil.DurationString(end.Sub(start))
  863. if durStr == "" {
  864. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  865. }
  866. queryDCGMContainerUsageMax := fmt.Sprintf(queryFmtDCGMContainerUsageMax, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  867. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDCGMContainerUsageMax)
  868. ctx := pds.promContexts.NewNamedContext(ComputeCostDataContextName)
  869. return source.NewFuture(source.DecodeDCGMDeviceContainerUsageResult, ctx.QueryAtTime(queryDCGMContainerUsageMax, end))
  870. }
  871. func (pds *PrometheusMetricsQuerier) QueryGPUInfo(start, end time.Time) *source.Future[source.GPUInfoResult] {
  872. const queryName = "QueryGPUInfo"
  873. const queryFmtGetGPUInfo = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{container!="",%s}[%s])) by (container, pod, namespace, device, modelName, UUID, pod_uid, %s)`
  874. cfg := pds.promConfig
  875. durStr := timeutil.DurationString(end.Sub(start))
  876. if durStr == "" {
  877. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  878. }
  879. queryGetGPUInfo := fmt.Sprintf(queryFmtGetGPUInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  880. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryGetGPUInfo)
  881. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  882. return source.NewFuture(source.DecodeGPUInfoResult, ctx.QueryAtTime(queryGetGPUInfo, end))
  883. }
  884. func (pds *PrometheusMetricsQuerier) QueryNodeCPUPricePerHr(start, end time.Time) *source.Future[source.NodeCPUPricePerHrResult] {
  885. const queryName = "QueryNodeCPUPricePerHr"
  886. const queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, uid, %s, instance_type, provider_id)`
  887. cfg := pds.promConfig
  888. durStr := timeutil.DurationString(end.Sub(start))
  889. if durStr == "" {
  890. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  891. }
  892. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  893. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeCostPerCPUHr)
  894. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  895. return source.NewFuture(source.DecodeNodeCPUPricePerHrResult, ctx.QueryAtTime(queryNodeCostPerCPUHr, end))
  896. }
  897. func (pds *PrometheusMetricsQuerier) QueryNodeRAMPricePerGiBHr(start, end time.Time) *source.Future[source.NodeRAMPricePerGiBHrResult] {
  898. const queryName = "QueryNodeRAMPricePerGiBHr"
  899. const queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, uid, %s, instance_type, provider_id)`
  900. cfg := pds.promConfig
  901. durStr := timeutil.DurationString(end.Sub(start))
  902. if durStr == "" {
  903. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  904. }
  905. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  906. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeCostPerRAMGiBHr)
  907. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  908. return source.NewFuture(source.DecodeNodeRAMPricePerGiBHrResult, ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end))
  909. }
  910. func (pds *PrometheusMetricsQuerier) QueryNodeGPUPricePerHr(start, end time.Time) *source.Future[source.NodeGPUPricePerHrResult] {
  911. const queryName = "QueryNodeGPUPricePerHr"
  912. const queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, uid, %s, instance_type, provider_id)`
  913. cfg := pds.promConfig
  914. durStr := timeutil.DurationString(end.Sub(start))
  915. if durStr == "" {
  916. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  917. }
  918. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  919. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeCostPerGPUHr)
  920. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  921. return source.NewFuture(source.DecodeNodeGPUPricePerHrResult, ctx.QueryAtTime(queryNodeCostPerGPUHr, end))
  922. }
  923. func (pds *PrometheusMetricsQuerier) QueryNodeIsSpot(start, end time.Time) *source.Future[source.NodeIsSpotResult] {
  924. const queryName = "QueryNodeIsSpot"
  925. const queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot{%s}[%s])`
  926. cfg := pds.promConfig
  927. durStr := timeutil.DurationString(end.Sub(start))
  928. if durStr == "" {
  929. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  930. }
  931. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, cfg.ClusterFilter, durStr)
  932. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNodeIsSpot)
  933. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  934. return source.NewFuture(source.DecodeNodeIsSpotResult, ctx.QueryAtTime(queryNodeIsSpot, end))
  935. }
  936. func (pds *PrometheusMetricsQuerier) QueryPodPVCAllocation(start, end time.Time) *source.Future[source.PodPVCAllocationResult] {
  937. const queryName = "QueryPodPVCAllocation"
  938. const queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation{%s}[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, uid, %s)`
  939. cfg := pds.promConfig
  940. durStr := timeutil.DurationString(end.Sub(start))
  941. if durStr == "" {
  942. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  943. }
  944. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  945. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodPVCAllocation)
  946. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  947. return source.NewFuture(source.DecodePodPVCAllocationResult, ctx.QueryAtTime(queryPodPVCAllocation, end))
  948. }
  949. func (pds *PrometheusMetricsQuerier) QueryPVCBytesRequested(start, end time.Time) *source.Future[source.PVCBytesRequestedResult] {
  950. const queryName = "QueryPVCBytesRequested"
  951. const queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}[%s])) by (persistentvolumeclaim, namespace, uid, %s)`
  952. cfg := pds.promConfig
  953. durStr := timeutil.DurationString(end.Sub(start))
  954. if durStr == "" {
  955. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  956. }
  957. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  958. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVCBytesRequested)
  959. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  960. return source.NewFuture(source.DecodePVCBytesRequestedResult, ctx.QueryAtTime(queryPVCBytesRequested, end))
  961. }
  962. func (pds *PrometheusMetricsQuerier) QueryPVBytes(start, end time.Time) *source.Future[source.PVBytesResult] {
  963. const queryName = "QueryPVBytes"
  964. const queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, uid, %s)`
  965. cfg := pds.promConfig
  966. durStr := timeutil.DurationString(end.Sub(start))
  967. if durStr == "" {
  968. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  969. }
  970. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  971. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVBytes)
  972. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  973. return source.NewFuture(source.DecodePVBytesResult, ctx.QueryAtTime(queryPVBytes, end))
  974. }
  975. func (pds *PrometheusMetricsQuerier) QueryPVInfo(start, end time.Time) *source.Future[source.PVInfoResult] {
  976. const queryName = "QueryPVInfo"
  977. const queryFmtPVMeta = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, storageclass, persistentvolume, uid, provider_id)`
  978. cfg := pds.promConfig
  979. durStr := timeutil.DurationString(end.Sub(start))
  980. if durStr == "" {
  981. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  982. }
  983. queryPVMeta := fmt.Sprintf(queryFmtPVMeta, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  984. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVMeta)
  985. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  986. return source.NewFuture(source.DecodePVInfoResult, ctx.QueryAtTime(queryPVMeta, end))
  987. }
  988. func (pds *PrometheusMetricsQuerier) QueryPVUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  989. const queryName = "QueryPVUptime"
  990. const queryFmtPVUptime = `avg(kubecost_pv_info{%s}) by (%s, uid)[%s:%dm]`
  991. cfg := pds.promConfig
  992. minsPerResolution := cfg.DataResolutionMinutes
  993. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  994. if durStr == "" {
  995. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  996. }
  997. queryPVUptime := fmt.Sprintf(queryFmtPVUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  998. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPVUptime)
  999. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1000. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryPVUptime, end))
  1001. }
  1002. func (pds *PrometheusMetricsQuerier) QueryNetZoneGiB(start, end time.Time) *source.Future[source.NetZoneGiBResult] {
  1003. const queryName = "QueryNetZoneGiB"
  1004. const queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1005. cfg := pds.promConfig
  1006. minsPerResolution := cfg.DataResolutionMinutes
  1007. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1008. if durStr == "" {
  1009. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1010. }
  1011. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1012. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetZoneGiB)
  1013. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1014. return source.NewFuture(source.DecodeNetZoneGiBResult, ctx.QueryAtTime(queryNetZoneGiB, end))
  1015. }
  1016. func (pds *PrometheusMetricsQuerier) QueryNetZonePricePerGiB(start, end time.Time) *source.Future[source.NetZonePricePerGiBResult] {
  1017. const queryName = "QueryNetZonePricePerGiB"
  1018. const queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
  1019. cfg := pds.promConfig
  1020. durStr := timeutil.DurationString(end.Sub(start))
  1021. if durStr == "" {
  1022. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1023. }
  1024. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1025. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetZoneCostPerGiB)
  1026. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1027. return source.NewFuture(source.DecodeNetZonePricePerGiBResult, ctx.QueryAtTime(queryNetZoneCostPerGiB, end))
  1028. }
  1029. func (pds *PrometheusMetricsQuerier) QueryNetRegionGiB(start, end time.Time) *source.Future[source.NetRegionGiBResult] {
  1030. const queryName = "QueryNetRegionGiB"
  1031. const queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1032. cfg := pds.promConfig
  1033. minsPerResolution := cfg.DataResolutionMinutes
  1034. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1035. if durStr == "" {
  1036. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1037. }
  1038. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1039. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetRegionGiB)
  1040. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1041. return source.NewFuture(source.DecodeNetRegionGiBResult, ctx.QueryAtTime(queryNetRegionGiB, end))
  1042. }
  1043. func (pds *PrometheusMetricsQuerier) QueryNetRegionPricePerGiB(start, end time.Time) *source.Future[source.NetRegionPricePerGiBResult] {
  1044. const queryName = "QueryNetRegionPricePerGiB"
  1045. const queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{%s}[%s])) by (%s)`
  1046. cfg := pds.promConfig
  1047. durStr := timeutil.DurationString(end.Sub(start))
  1048. if durStr == "" {
  1049. panic("failed to parse duration string passed to QueryNetRegionPricePerGiB")
  1050. }
  1051. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1052. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetRegionCostPerGiB)
  1053. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1054. return source.NewFuture(source.DecodeNetRegionPricePerGiBResult, ctx.QueryAtTime(queryNetRegionCostPerGiB, end))
  1055. }
  1056. func (pds *PrometheusMetricsQuerier) QueryNetInternetGiB(start, end time.Time) *source.Future[source.NetInternetGiBResult] {
  1057. const queryName = "QueryNetInternetGiB"
  1058. const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1059. cfg := pds.promConfig
  1060. minsPerResolution := cfg.DataResolutionMinutes
  1061. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1062. if durStr == "" {
  1063. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1064. }
  1065. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1066. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetInternetGiB)
  1067. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1068. return source.NewFuture(source.DecodeNetInternetGiBResult, ctx.QueryAtTime(queryNetInternetGiB, end))
  1069. }
  1070. func (pds *PrometheusMetricsQuerier) QueryNetInternetPricePerGiB(start, end time.Time) *source.Future[source.NetInternetPricePerGiBResult] {
  1071. const queryName = "QueryNetInternetPricePerGiB"
  1072. const queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{%s}[%s])) by (%s)`
  1073. cfg := pds.promConfig
  1074. durStr := timeutil.DurationString(end.Sub(start))
  1075. if durStr == "" {
  1076. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1077. }
  1078. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1079. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetInternetCostPerGiB)
  1080. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1081. return source.NewFuture(source.DecodeNetInternetPricePerGiBResult, ctx.QueryAtTime(queryNetInternetCostPerGiB, end))
  1082. }
  1083. func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceGiB(start, end time.Time) *source.Future[source.NetInternetServiceGiBResult] {
  1084. const queryName = "QueryNetInternetServiceGiB"
  1085. const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, service, uid, %s) / 1024 / 1024 / 1024`
  1086. cfg := pds.promConfig
  1087. minsPerResolution := cfg.DataResolutionMinutes
  1088. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1089. if durStr == "" {
  1090. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1091. }
  1092. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1093. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetInternetGiB)
  1094. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1095. return source.NewFuture(source.DecodeNetInternetServiceGiBResult, ctx.QueryAtTime(queryNetInternetGiB, end))
  1096. }
  1097. func (pds *PrometheusMetricsQuerier) QueryNetNatGatewayPricePerGiB(start, end time.Time) *source.Future[source.NetNatGatewayPricePerGiBResult] {
  1098. const queryName = "QueryNetNatGatewayPricePerGiB"
  1099. const queryFmtNetNatGatewayPricePerGiB = `avg(avg_over_time(kubecost_network_nat_gateway_egress_cost{%s}[%s])) by (%s)`
  1100. cfg := pds.promConfig
  1101. durStr := timeutil.DurationString(end.Sub(start))
  1102. if durStr == "" {
  1103. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1104. }
  1105. queryNetNatGatewayPricePerGiB := fmt.Sprintf(queryFmtNetNatGatewayPricePerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1106. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetNatGatewayPricePerGiB)
  1107. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1108. return source.NewFuture(source.DecodeNetNatGatewayPricePerGiBResult, ctx.QueryAtTime(queryNetNatGatewayPricePerGiB, end))
  1109. }
  1110. func (pds *PrometheusMetricsQuerier) QueryNetNatGatewayGiB(start, end time.Time) *source.Future[source.NetNatGatewayGiBResult] {
  1111. const queryName = "QueryNetNatGatewayGiB"
  1112. const queryFmtNetNatGatewayGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{nat_gateway="true", %s}[%s:%dm])) by (pod_name, namespace, service, uid, %s) / 1024 / 1024 / 1024`
  1113. cfg := pds.promConfig
  1114. minsPerResolution := cfg.DataResolutionMinutes
  1115. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1116. if durStr == "" {
  1117. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1118. }
  1119. queryNetNatGatewayGiB := fmt.Sprintf(queryFmtNetNatGatewayGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1120. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetNatGatewayGiB)
  1121. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1122. return source.NewFuture(source.DecodeNetNatGatewayGiBResult, ctx.QueryAtTime(queryNetNatGatewayGiB, end))
  1123. }
  1124. func (pds *PrometheusMetricsQuerier) QueryNetTransferBytes(start, end time.Time) *source.Future[source.NetTransferBytesResult] {
  1125. const queryName = "QueryNetTransferBytes"
  1126. const queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s:%dm])) by (pod_name, pod, namespace, uid, %s)`
  1127. cfg := pds.promConfig
  1128. minsPerResolution := cfg.DataResolutionMinutes
  1129. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1130. if durStr == "" {
  1131. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1132. }
  1133. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1134. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetTransferBytes)
  1135. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1136. return source.NewFuture(source.DecodeNetTransferBytesResult, ctx.QueryAtTime(queryNetTransferBytes, end))
  1137. }
  1138. func (pds *PrometheusMetricsQuerier) QueryNetZoneIngressGiB(start, end time.Time) *source.Future[source.NetZoneIngressGiBResult] {
  1139. const queryName = "QueryNetZoneIngressGiB"
  1140. const queryFmtIngNetZoneGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1141. cfg := pds.promConfig
  1142. minsPerResolution := cfg.DataResolutionMinutes
  1143. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1144. if durStr == "" {
  1145. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1146. }
  1147. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtIngNetZoneGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1148. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetZoneCostPerGiB)
  1149. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1150. return source.NewFuture(source.DecodeNetZoneIngressGiBResult, ctx.QueryAtTime(queryNetZoneCostPerGiB, end))
  1151. }
  1152. func (pds *PrometheusMetricsQuerier) QueryNetRegionIngressGiB(start, end time.Time) *source.Future[source.NetRegionIngressGiBResult] {
  1153. const queryName = "QueryNetRegionIngressGiB"
  1154. const queryFmtIngNetRegionGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1155. cfg := pds.promConfig
  1156. minsPerResolution := cfg.DataResolutionMinutes
  1157. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1158. if durStr == "" {
  1159. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1160. }
  1161. queryNetRegionIngGiB := fmt.Sprintf(queryFmtIngNetRegionGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1162. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetRegionIngGiB)
  1163. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1164. return source.NewFuture(source.DecodeNetRegionIngressGiBResult, ctx.QueryAtTime(queryNetRegionIngGiB, end))
  1165. }
  1166. func (pds *PrometheusMetricsQuerier) QueryNetInternetIngressGiB(start, end time.Time) *source.Future[source.NetInternetIngressGiBResult] {
  1167. const queryName = "QueryNetInternetIngressGiB"
  1168. const queryFmtNetIngInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, uid, %s) / 1024 / 1024 / 1024`
  1169. cfg := pds.promConfig
  1170. minsPerResolution := cfg.DataResolutionMinutes
  1171. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1172. if durStr == "" {
  1173. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1174. }
  1175. queryNetIngInternetGiB := fmt.Sprintf(queryFmtNetIngInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1176. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetIngInternetGiB)
  1177. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1178. return source.NewFuture(source.DecodeNetInternetIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
  1179. }
  1180. func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceIngressGiB(start, end time.Time) *source.Future[source.NetInternetServiceIngressGiBResult] {
  1181. const queryName = "QueryNetInternetServiceIngressGiB"
  1182. const queryFmtIngNetInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, service, uid, %s) / 1024 / 1024 / 1024`
  1183. cfg := pds.promConfig
  1184. minsPerResolution := cfg.DataResolutionMinutes
  1185. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1186. if durStr == "" {
  1187. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1188. }
  1189. queryNetIngInternetGiB := fmt.Sprintf(queryFmtIngNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1190. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetIngInternetGiB)
  1191. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1192. return source.NewFuture(source.DecodeNetInternetServiceIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
  1193. }
  1194. func (pds *PrometheusMetricsQuerier) QueryNetNatGatewayIngressPricePerGiB(start, end time.Time) *source.Future[source.NetNatGatewayPricePerGiBResult] {
  1195. const queryName = "QueryNetNatGatewayIngressPricePerGiB"
  1196. const queryFmtNetNatGatewayIngressPricePerGiB = `avg(avg_over_time(kubecost_network_nat_gateway_ingress_cost{%s}[%s])) by (%s)`
  1197. cfg := pds.promConfig
  1198. durStr := timeutil.DurationString(end.Sub(start))
  1199. if durStr == "" {
  1200. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1201. }
  1202. queryNetNatGatewayIngressPricePerGiB := fmt.Sprintf(queryFmtNetNatGatewayIngressPricePerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1203. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetNatGatewayIngressPricePerGiB)
  1204. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1205. return source.NewFuture(source.DecodeNetNatGatewayPricePerGiBResult, ctx.QueryAtTime(queryNetNatGatewayIngressPricePerGiB, end))
  1206. }
  1207. func (pds *PrometheusMetricsQuerier) QueryNetNatGatewayIngressGiB(start, end time.Time) *source.Future[source.NetNatGatewayIngressGiBResult] {
  1208. const queryName = "QueryNetNatGatewayIngressGiB"
  1209. const queryFmtNetNatGatewayIngressGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{nat_gateway="true", %s}[%s:%dm])) by (pod_name, namespace, service, uid, %s) / 1024 / 1024 / 1024`
  1210. cfg := pds.promConfig
  1211. minsPerResolution := cfg.DataResolutionMinutes
  1212. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1213. if durStr == "" {
  1214. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1215. }
  1216. queryNetNatGatewayIngressGiB := fmt.Sprintf(queryFmtNetNatGatewayIngressGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1217. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetNatGatewayIngressGiB)
  1218. ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
  1219. return source.NewFuture(source.DecodeNetNatGatewayIngressGiBResult, ctx.QueryAtTime(queryNetNatGatewayIngressGiB, end))
  1220. }
  1221. func (pds *PrometheusMetricsQuerier) QueryNetReceiveBytes(start, end time.Time) *source.Future[source.NetReceiveBytesResult] {
  1222. const queryName = "QueryNetReceiveBytes"
  1223. const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s:%dm])) by (pod_name, pod, namespace, uid, %s)`
  1224. cfg := pds.promConfig
  1225. minsPerResolution := cfg.DataResolutionMinutes
  1226. durStr := pds.durationStringFor(start, end, minsPerResolution, true)
  1227. if durStr == "" {
  1228. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1229. }
  1230. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  1231. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNetReceiveBytes)
  1232. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1233. return source.NewFuture(source.DecodeNetReceiveBytesResult, ctx.QueryAtTime(queryNetReceiveBytes, end))
  1234. }
  1235. func (pds *PrometheusMetricsQuerier) QueryNamespaceInfo(start, end time.Time) *source.Future[source.NamespaceInfoResult] {
  1236. const queryName = "QueryNamespaceInfo"
  1237. const queryFmtNamespaceInfo = `avg(avg_over_time(namespace_info{%s}[%s])) by (%s, uid, namespace)`
  1238. cfg := pds.promConfig
  1239. durStr := timeutil.DurationString(end.Sub(start))
  1240. if durStr == "" {
  1241. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1242. }
  1243. queryNamespaceInfo := fmt.Sprintf(queryFmtNamespaceInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1244. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNamespaceInfo)
  1245. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1246. return source.NewFuture(source.DecodeNamespaceInfoResult, ctx.QueryAtTime(queryNamespaceInfo, end))
  1247. }
  1248. // Note: namespace_info is not currently emitted
  1249. func (pds *PrometheusMetricsQuerier) QueryNamespaceUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1250. const queryName = "QueryNamespaceUptime"
  1251. const queryFmtNamespaceUptime = `avg(namespace_info{%s}) by (%s, uid)[%s:%dm]`
  1252. cfg := pds.promConfig
  1253. minsPerResolution := cfg.DataResolutionMinutes
  1254. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1255. if durStr == "" {
  1256. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1257. }
  1258. queryNamespaceUptime := fmt.Sprintf(queryFmtNamespaceUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1259. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryFmtNamespaceUptime)
  1260. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1261. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryNamespaceUptime, end))
  1262. }
  1263. func (pds *PrometheusMetricsQuerier) QueryNamespaceLabels(start, end time.Time) *source.Future[source.NamespaceLabelsResult] {
  1264. const queryName = "QueryNamespaceLabels"
  1265. const queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels{%s}[%s])`
  1266. cfg := pds.promConfig
  1267. durStr := timeutil.DurationString(end.Sub(start))
  1268. if durStr == "" {
  1269. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1270. }
  1271. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, cfg.ClusterFilter, durStr)
  1272. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNamespaceLabels)
  1273. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1274. return source.NewFuture(source.DecodeNamespaceLabelsResult, ctx.QueryAtTime(queryNamespaceLabels, end))
  1275. }
  1276. func (pds *PrometheusMetricsQuerier) QueryNamespaceAnnotations(start, end time.Time) *source.Future[source.NamespaceAnnotationsResult] {
  1277. const queryName = "QueryNamespaceAnnotations"
  1278. const queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations{%s}[%s])`
  1279. cfg := pds.promConfig
  1280. durStr := timeutil.DurationString(end.Sub(start))
  1281. if durStr == "" {
  1282. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1283. }
  1284. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, cfg.ClusterFilter, durStr)
  1285. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryNamespaceAnnotations)
  1286. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1287. return source.NewFuture(source.DecodeNamespaceAnnotationsResult, ctx.QueryAtTime(queryNamespaceAnnotations, end))
  1288. }
  1289. func (pds *PrometheusMetricsQuerier) QueryPodLabels(start, end time.Time) *source.Future[source.PodLabelsResult] {
  1290. const queryName = "QueryPodLabels"
  1291. const queryFmtPodLabels = `avg_over_time(kube_pod_labels{%s}[%s])`
  1292. cfg := pds.promConfig
  1293. durStr := timeutil.DurationString(end.Sub(start))
  1294. if durStr == "" {
  1295. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1296. }
  1297. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, cfg.ClusterFilter, durStr)
  1298. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodLabels)
  1299. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1300. return source.NewFuture(source.DecodePodLabelsResult, ctx.QueryAtTime(queryPodLabels, end))
  1301. }
  1302. func (pds *PrometheusMetricsQuerier) QueryPodAnnotations(start, end time.Time) *source.Future[source.PodAnnotationsResult] {
  1303. const queryName = "QueryPodAnnotations"
  1304. const queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations{%s}[%s])`
  1305. cfg := pds.promConfig
  1306. durStr := timeutil.DurationString(end.Sub(start))
  1307. if durStr == "" {
  1308. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1309. }
  1310. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, cfg.ClusterFilter, durStr)
  1311. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodAnnotations)
  1312. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1313. return source.NewFuture(source.DecodePodAnnotationsResult, ctx.QueryAtTime(queryPodAnnotations, end))
  1314. }
  1315. func (pds *PrometheusMetricsQuerier) QueryServiceInfo(start, end time.Time) *source.Future[source.ServiceInfoResult] {
  1316. const queryName = "QueryServiceInfo"
  1317. const queryFmtServiceInfo = `avg(avg_over_time(service_selector_labels{%s}[%s])) by (%s, uid, namespace_uid, service, service_type)`
  1318. cfg := pds.promConfig
  1319. durStr := timeutil.DurationString(end.Sub(start))
  1320. if durStr == "" {
  1321. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1322. }
  1323. queryServiceInfo := fmt.Sprintf(queryFmtServiceInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1324. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryServiceInfo)
  1325. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1326. return source.NewFuture(source.DecodeServiceInfoResult, ctx.QueryAtTime(queryServiceInfo, end))
  1327. }
  1328. func (pds *PrometheusMetricsQuerier) QueryServiceUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1329. const queryName = "QueryServiceUptime"
  1330. const queryFmtServiceUptime = `avg(service_selector_labels{%s}) by (%s, uid)[%s:%dm]`
  1331. cfg := pds.promConfig
  1332. minsPerResolution := cfg.DataResolutionMinutes
  1333. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1334. if durStr == "" {
  1335. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1336. }
  1337. queryServiceUptime := fmt.Sprintf(queryFmtServiceUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1338. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryServiceUptime)
  1339. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1340. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryServiceUptime, end))
  1341. }
  1342. func (pds *PrometheusMetricsQuerier) QueryServiceSelectorLabels(start, end time.Time) *source.Future[source.ServiceLabelsResult] {
  1343. const queryName = "QueryServiceSelectorLabels"
  1344. const queryFmtServiceSelectorLabels = `avg_over_time(service_selector_labels{%s}[%s])`
  1345. cfg := pds.promConfig
  1346. durStr := timeutil.DurationString(end.Sub(start))
  1347. if durStr == "" {
  1348. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1349. }
  1350. queryServiceSelectorLabels := fmt.Sprintf(queryFmtServiceSelectorLabels, cfg.ClusterFilter, durStr)
  1351. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryServiceSelectorLabels)
  1352. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1353. return source.NewFuture(source.DecodeServiceLabelsResult, ctx.QueryAtTime(queryServiceSelectorLabels, end))
  1354. }
  1355. func (pds *PrometheusMetricsQuerier) QueryDeploymentInfo(start, end time.Time) *source.Future[source.DeploymentInfoResult] {
  1356. const queryName = "QueryDeploymentInfo"
  1357. const queryFmtDeploymentInfo = `avg(avg_over_time(deployment_info{%s}[%s])) by (%s, uid, namespace_uid, deployment)`
  1358. cfg := pds.promConfig
  1359. durStr := timeutil.DurationString(end.Sub(start))
  1360. if durStr == "" {
  1361. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1362. }
  1363. queryDeploymentInfo := fmt.Sprintf(queryFmtDeploymentInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1364. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDeploymentInfo)
  1365. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1366. return source.NewFuture(source.DecodeDeploymentInfoResult, ctx.QueryAtTime(queryDeploymentInfo, end))
  1367. }
  1368. func (pds *PrometheusMetricsQuerier) QueryDeploymentUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1369. const queryName = "QueryDeploymentUptime"
  1370. const queryFmtDeploymentUptime = `avg(deployment_info{%s}) by (%s, uid)[%s:%dm]`
  1371. cfg := pds.promConfig
  1372. minsPerResolution := cfg.DataResolutionMinutes
  1373. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1374. if durStr == "" {
  1375. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1376. }
  1377. queryDeploymentUptime := fmt.Sprintf(queryFmtDeploymentUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1378. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDeploymentUptime)
  1379. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1380. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryDeploymentUptime, end))
  1381. }
  1382. func (pds *PrometheusMetricsQuerier) QueryDeploymentLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1383. const queryName = "QueryDeploymentLabels"
  1384. const queryFmtDeploymentLabels = `avg_over_time(deployment_labels{%s}[%s])`
  1385. cfg := pds.promConfig
  1386. durStr := timeutil.DurationString(end.Sub(start))
  1387. if durStr == "" {
  1388. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1389. }
  1390. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, cfg.ClusterFilter, durStr)
  1391. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDeploymentLabels)
  1392. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1393. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryDeploymentLabels, end))
  1394. }
  1395. func (pds *PrometheusMetricsQuerier) QueryDeploymentAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1396. const queryName = "QueryDeploymentAnnotations"
  1397. const queryFmtDeploymentAnnotations = `avg_over_time(deployment_annotations{%s}[%s])`
  1398. cfg := pds.promConfig
  1399. durStr := timeutil.DurationString(end.Sub(start))
  1400. if durStr == "" {
  1401. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1402. }
  1403. queryDeploymentAnnotations := fmt.Sprintf(queryFmtDeploymentAnnotations, cfg.ClusterFilter, durStr)
  1404. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDeploymentAnnotations)
  1405. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1406. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryDeploymentAnnotations, end))
  1407. }
  1408. func (pds *PrometheusMetricsQuerier) QueryDeploymentMatchLabels(start, end time.Time) *source.Future[source.DeploymentLabelsResult] {
  1409. const queryName = "QueryDeploymentMatchLabels"
  1410. const queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels{%s}[%s])`
  1411. cfg := pds.promConfig
  1412. durStr := timeutil.DurationString(end.Sub(start))
  1413. if durStr == "" {
  1414. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1415. }
  1416. queryDeploymentMatchLabels := fmt.Sprintf(queryFmtDeploymentLabels, cfg.ClusterFilter, durStr)
  1417. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDeploymentMatchLabels)
  1418. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1419. return source.NewFuture(source.DecodeDeploymentLabelsResult, ctx.QueryAtTime(queryDeploymentMatchLabels, end))
  1420. }
  1421. func (pds *PrometheusMetricsQuerier) QueryStatefulSetInfo(start, end time.Time) *source.Future[source.StatefulSetInfoResult] {
  1422. const queryName = "QueryStatefulSetInfo"
  1423. const queryFmtStatefulSetInfo = `avg(avg_over_time(statefulset_info{%s}[%s])) by (%s, uid, namespace_uid, statefulSet)`
  1424. cfg := pds.promConfig
  1425. durStr := timeutil.DurationString(end.Sub(start))
  1426. if durStr == "" {
  1427. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1428. }
  1429. queryStatefulSetInfo := fmt.Sprintf(queryFmtStatefulSetInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1430. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryStatefulSetInfo)
  1431. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1432. return source.NewFuture(source.DecodeStatefulSetInfoResult, ctx.QueryAtTime(queryStatefulSetInfo, end))
  1433. }
  1434. func (pds *PrometheusMetricsQuerier) QueryStatefulSetUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1435. const queryName = "QueryStatefulSetUptime"
  1436. const queryFmtStatefulSetUptime = `avg(statefulset_info{%s}) by (%s, uid)[%s:%dm]`
  1437. cfg := pds.promConfig
  1438. minsPerResolution := cfg.DataResolutionMinutes
  1439. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1440. if durStr == "" {
  1441. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1442. }
  1443. queryStatefulSetUptime := fmt.Sprintf(queryFmtStatefulSetUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1444. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryStatefulSetUptime)
  1445. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1446. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryStatefulSetUptime, end))
  1447. }
  1448. func (pds *PrometheusMetricsQuerier) QueryStatefulSetLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1449. const queryName = "QueryStatefulSetLabels"
  1450. const queryFmtStatefulSetLabels = `avg_over_time(statefulset_labels{%s}[%s])`
  1451. cfg := pds.promConfig
  1452. durStr := timeutil.DurationString(end.Sub(start))
  1453. if durStr == "" {
  1454. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1455. }
  1456. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, cfg.ClusterFilter, durStr)
  1457. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryStatefulSetLabels)
  1458. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1459. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryStatefulSetLabels, end))
  1460. }
  1461. func (pds *PrometheusMetricsQuerier) QueryStatefulSetAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1462. const queryName = "QueryStatefulSetAnnotations"
  1463. const queryFmtStatefulSetAnnotations = `avg_over_time(statefulset_annotations{%s}[%s])`
  1464. cfg := pds.promConfig
  1465. durStr := timeutil.DurationString(end.Sub(start))
  1466. if durStr == "" {
  1467. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1468. }
  1469. queryStatefulSetAnnotations := fmt.Sprintf(queryFmtStatefulSetAnnotations, cfg.ClusterFilter, durStr)
  1470. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryStatefulSetAnnotations)
  1471. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1472. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryStatefulSetAnnotations, end))
  1473. }
  1474. func (pds *PrometheusMetricsQuerier) QueryStatefulSetMatchLabels(start, end time.Time) *source.Future[source.StatefulSetLabelsResult] {
  1475. const queryName = "QueryStatefulSetMatchLabels"
  1476. const queryFmtStatefulSetMatchLabels = `avg_over_time(statefulSet_match_labels{%s}[%s])`
  1477. cfg := pds.promConfig
  1478. durStr := timeutil.DurationString(end.Sub(start))
  1479. if durStr == "" {
  1480. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1481. }
  1482. queryStatefulSetMatchLabels := fmt.Sprintf(queryFmtStatefulSetMatchLabels, cfg.ClusterFilter, durStr)
  1483. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryStatefulSetMatchLabels)
  1484. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1485. return source.NewFuture(source.DecodeStatefulSetLabelsResult, ctx.QueryAtTime(queryStatefulSetMatchLabels, end))
  1486. }
  1487. func (pds *PrometheusMetricsQuerier) QueryDaemonSetInfo(start, end time.Time) *source.Future[source.DaemonSetInfoResult] {
  1488. const queryName = "QueryDaemonSetInfo"
  1489. const queryFmtDaemonSetInfo = `avg(avg_over_time(daemonset_info{%s}[%s])) by (%s, uid, namespace_uid, daemonset)`
  1490. cfg := pds.promConfig
  1491. durStr := timeutil.DurationString(end.Sub(start))
  1492. if durStr == "" {
  1493. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1494. }
  1495. queryDaemonSetInfo := fmt.Sprintf(queryFmtDaemonSetInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1496. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDaemonSetInfo)
  1497. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1498. return source.NewFuture(source.DecodeDaemonSetInfoResult, ctx.QueryAtTime(queryDaemonSetInfo, end))
  1499. }
  1500. func (pds *PrometheusMetricsQuerier) QueryDaemonSetUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1501. const queryName = "QueryDaemonSetUptime"
  1502. const queryFmtDaemonSetUptime = `avg(daemonset_info{%s}) by (%s, uid)[%s:%dm]`
  1503. cfg := pds.promConfig
  1504. minsPerResolution := cfg.DataResolutionMinutes
  1505. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1506. if durStr == "" {
  1507. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1508. }
  1509. queryDaemonSetUptime := fmt.Sprintf(queryFmtDaemonSetUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1510. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDaemonSetUptime)
  1511. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1512. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryDaemonSetUptime, end))
  1513. }
  1514. func (pds *PrometheusMetricsQuerier) QueryDaemonSetLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1515. const queryName = "QueryDaemonSetLabels"
  1516. const queryFmtDaemonSetLabels = `avg_over_time(daemonset_labels{%s}[%s])`
  1517. cfg := pds.promConfig
  1518. durStr := timeutil.DurationString(end.Sub(start))
  1519. if durStr == "" {
  1520. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1521. }
  1522. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, cfg.ClusterFilter, durStr)
  1523. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDaemonSetLabels)
  1524. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1525. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryDaemonSetLabels, end))
  1526. }
  1527. func (pds *PrometheusMetricsQuerier) QueryDaemonSetAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1528. const queryName = "QueryDaemonSetAnnotations"
  1529. const queryFmtDaemonSetAnnotations = `avg_over_time(daemonset_annotations{%s}[%s])`
  1530. cfg := pds.promConfig
  1531. durStr := timeutil.DurationString(end.Sub(start))
  1532. if durStr == "" {
  1533. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1534. }
  1535. queryDaemonSetAnnotations := fmt.Sprintf(queryFmtDaemonSetAnnotations, cfg.ClusterFilter, durStr)
  1536. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryDaemonSetAnnotations)
  1537. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1538. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryDaemonSetAnnotations, end))
  1539. }
  1540. func (pds *PrometheusMetricsQuerier) QueryJobInfo(start, end time.Time) *source.Future[source.JobInfoResult] {
  1541. const queryName = "QueryJobInfo"
  1542. const queryFmtJobInfo = `avg(avg_over_time(job_info{%s}[%s])) by (%s, uid, namespace_uid, job)`
  1543. cfg := pds.promConfig
  1544. durStr := timeutil.DurationString(end.Sub(start))
  1545. if durStr == "" {
  1546. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1547. }
  1548. queryJobInfo := fmt.Sprintf(queryFmtJobInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1549. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryJobInfo)
  1550. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1551. return source.NewFuture(source.DecodeJobInfoResult, ctx.QueryAtTime(queryJobInfo, end))
  1552. }
  1553. func (pds *PrometheusMetricsQuerier) QueryJobUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1554. const queryName = "QueryJobUptime"
  1555. const queryFmtJobUptime = `avg(job_info{%s}) by (%s, uid)[%s:%dm]`
  1556. cfg := pds.promConfig
  1557. minsPerResolution := cfg.DataResolutionMinutes
  1558. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1559. if durStr == "" {
  1560. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1561. }
  1562. queryJobUptime := fmt.Sprintf(queryFmtJobUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1563. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryJobUptime)
  1564. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1565. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryJobUptime, end))
  1566. }
  1567. func (pds *PrometheusMetricsQuerier) QueryJobLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1568. const queryName = "QueryJobLabels"
  1569. const queryFmtJobLabels = `avg_over_time(job_labels{%s}[%s])`
  1570. cfg := pds.promConfig
  1571. durStr := timeutil.DurationString(end.Sub(start))
  1572. if durStr == "" {
  1573. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1574. }
  1575. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, cfg.ClusterFilter, durStr)
  1576. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryJobLabels)
  1577. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1578. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryJobLabels, end))
  1579. }
  1580. func (pds *PrometheusMetricsQuerier) QueryJobAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1581. const queryName = "QueryJobAnnotations"
  1582. const queryFmtJobAnnotations = `avg_over_time(job_annotations{%s}[%s])`
  1583. cfg := pds.promConfig
  1584. durStr := timeutil.DurationString(end.Sub(start))
  1585. if durStr == "" {
  1586. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1587. }
  1588. queryJobAnnotations := fmt.Sprintf(queryFmtJobAnnotations, cfg.ClusterFilter, durStr)
  1589. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryJobAnnotations)
  1590. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1591. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryJobAnnotations, end))
  1592. }
  1593. func (pds *PrometheusMetricsQuerier) QueryCronJobInfo(start, end time.Time) *source.Future[source.CronJobInfoResult] {
  1594. const queryName = "QueryCronJobInfo"
  1595. const queryFmtCronJobInfo = `avg(avg_over_time(cronjob_info{%s}[%s])) by (%s, uid, namespace_uid, cronjob)`
  1596. cfg := pds.promConfig
  1597. durStr := timeutil.DurationString(end.Sub(start))
  1598. if durStr == "" {
  1599. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1600. }
  1601. queryCronJobInfo := fmt.Sprintf(queryFmtCronJobInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1602. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCronJobInfo)
  1603. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1604. return source.NewFuture(source.DecodeCronJobInfoResult, ctx.QueryAtTime(queryCronJobInfo, end))
  1605. }
  1606. func (pds *PrometheusMetricsQuerier) QueryCronJobUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1607. const queryName = "QueryCronJobUptime"
  1608. const queryFmtCronJobUptime = `avg(cronjob_info{%s}) by (%s, uid)[%s:%dm]`
  1609. cfg := pds.promConfig
  1610. minsPerResolution := cfg.DataResolutionMinutes
  1611. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1612. if durStr == "" {
  1613. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1614. }
  1615. queryCronJobUptime := fmt.Sprintf(queryFmtCronJobUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1616. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCronJobUptime)
  1617. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1618. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryCronJobUptime, end))
  1619. }
  1620. func (pds *PrometheusMetricsQuerier) QueryCronJobLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1621. const queryName = "QueryCronJobLabels"
  1622. const queryFmtCronJobLabels = `avg_over_time(cronjob_labels{%s}[%s])`
  1623. cfg := pds.promConfig
  1624. durStr := timeutil.DurationString(end.Sub(start))
  1625. if durStr == "" {
  1626. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1627. }
  1628. queryCronJobLabels := fmt.Sprintf(queryFmtCronJobLabels, cfg.ClusterFilter, durStr)
  1629. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCronJobLabels)
  1630. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1631. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryCronJobLabels, end))
  1632. }
  1633. func (pds *PrometheusMetricsQuerier) QueryCronJobAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1634. const queryName = "QueryCronJobAnnotations"
  1635. const queryFmtCronJobAnnotations = `avg_over_time(cronjob_annotations{%s}[%s])`
  1636. cfg := pds.promConfig
  1637. durStr := timeutil.DurationString(end.Sub(start))
  1638. if durStr == "" {
  1639. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1640. }
  1641. queryCronJobAnnotations := fmt.Sprintf(queryFmtCronJobAnnotations, cfg.ClusterFilter, durStr)
  1642. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryCronJobAnnotations)
  1643. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1644. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryCronJobAnnotations, end))
  1645. }
  1646. func (pds *PrometheusMetricsQuerier) QueryReplicaSetInfo(start, end time.Time) *source.Future[source.ReplicaSetInfoResult] {
  1647. const queryName = "QueryReplicaSetInfo"
  1648. const queryFmtReplicaSetInfo = `avg(avg_over_time(replicaset_info{%s}[%s])) by (%s, uid, namespace_uid, replicaset)`
  1649. cfg := pds.promConfig
  1650. durStr := timeutil.DurationString(end.Sub(start))
  1651. if durStr == "" {
  1652. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1653. }
  1654. queryReplicaSetInfo := fmt.Sprintf(queryFmtReplicaSetInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1655. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetInfo)
  1656. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1657. return source.NewFuture(source.DecodeReplicaSetInfoResult, ctx.QueryAtTime(queryReplicaSetInfo, end))
  1658. }
  1659. func (pds *PrometheusMetricsQuerier) QueryReplicaSetUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1660. const queryName = "QueryReplicaSetUptime"
  1661. const queryFmtReplicaSetUptime = `avg(replicaset_info{%s}) by (%s, uid)[%s:%dm]`
  1662. cfg := pds.promConfig
  1663. minsPerResolution := cfg.DataResolutionMinutes
  1664. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1665. if durStr == "" {
  1666. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1667. }
  1668. queryReplicaSetUptime := fmt.Sprintf(queryFmtReplicaSetUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1669. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetUptime)
  1670. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1671. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryReplicaSetUptime, end))
  1672. }
  1673. func (pds *PrometheusMetricsQuerier) QueryReplicaSetLabels(start, end time.Time) *source.Future[source.LabelsResult] {
  1674. const queryName = "QueryReplicaSetLabels"
  1675. const queryFmtReplicaSetLabels = `avg_over_time(replicaset_labels{%s}[%s])`
  1676. cfg := pds.promConfig
  1677. durStr := timeutil.DurationString(end.Sub(start))
  1678. if durStr == "" {
  1679. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1680. }
  1681. queryReplicaSetLabels := fmt.Sprintf(queryFmtReplicaSetLabels, cfg.ClusterFilter, durStr)
  1682. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetLabels)
  1683. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1684. return source.NewFuture(source.DecodeLabelsResult, ctx.QueryAtTime(queryReplicaSetLabels, end))
  1685. }
  1686. func (pds *PrometheusMetricsQuerier) QueryReplicaSetAnnotations(start, end time.Time) *source.Future[source.AnnotationsResult] {
  1687. const queryName = "QueryReplicaSetAnnotations"
  1688. const queryFmtReplicaSetAnnotations = `avg_over_time(replicaset_annotations{%s}[%s])`
  1689. cfg := pds.promConfig
  1690. durStr := timeutil.DurationString(end.Sub(start))
  1691. if durStr == "" {
  1692. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1693. }
  1694. queryReplicaSetAnnotations := fmt.Sprintf(queryFmtReplicaSetAnnotations, cfg.ClusterFilter, durStr)
  1695. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetAnnotations)
  1696. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1697. return source.NewFuture(source.DecodeAnnotationsResult, ctx.QueryAtTime(queryReplicaSetAnnotations, end))
  1698. }
  1699. func (pds *PrometheusMetricsQuerier) QueryReplicaSetOwners(start, end time.Time) *source.Future[source.OwnerResult] {
  1700. const queryName = "QueryReplicaSetOwners"
  1701. const queryFmtReplicaSetOwners = `avg(avg_over_time(kube_replicaset_owner{%s}[%s])) by (%s, uid, owner_uid, owner_kind)`
  1702. cfg := pds.promConfig
  1703. durStr := timeutil.DurationString(end.Sub(start))
  1704. if durStr == "" {
  1705. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1706. }
  1707. queryReplicaSetOwners := fmt.Sprintf(queryFmtReplicaSetOwners, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1708. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetOwners)
  1709. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1710. return source.NewFuture(source.DecodeOwnerResult, ctx.QueryAtTime(queryReplicaSetOwners, end))
  1711. }
  1712. func (pds *PrometheusMetricsQuerier) QueryPodsWithDaemonSetOwner(start, end time.Time) *source.Future[source.PodsWithDaemonSetOwnerResult] {
  1713. const queryName = "QueryPodsWithDaemonSetOwner"
  1714. const queryFmtPodsWithDaemonSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet", %s}[%s])) by (pod, owner_name, namespace, uid, %s)`
  1715. cfg := pds.promConfig
  1716. durStr := timeutil.DurationString(end.Sub(start))
  1717. if durStr == "" {
  1718. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1719. }
  1720. queryPodsWithDaemonSetOwner := fmt.Sprintf(queryFmtPodsWithDaemonSetOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1721. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodsWithDaemonSetOwner)
  1722. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1723. return source.NewFuture(source.DecodePodsWithDaemonSetOwnerResult, ctx.QueryAtTime(queryPodsWithDaemonSetOwner, end))
  1724. }
  1725. func (pds *PrometheusMetricsQuerier) QueryPodsWithJobOwner(start, end time.Time) *source.Future[source.PodsWithJobOwnerResult] {
  1726. const queryName = "QueryPodsWithJobOwner"
  1727. const queryFmtPodsWithJobOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="Job", %s}[%s])) by (pod, owner_name, namespace, uid, %s)`
  1728. cfg := pds.promConfig
  1729. durStr := timeutil.DurationString(end.Sub(start))
  1730. if durStr == "" {
  1731. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1732. }
  1733. queryPodsWithJobOwner := fmt.Sprintf(queryFmtPodsWithJobOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1734. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodsWithJobOwner)
  1735. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1736. return source.NewFuture(source.DecodePodsWithJobOwnerResult, ctx.QueryAtTime(queryPodsWithJobOwner, end))
  1737. }
  1738. func (pds *PrometheusMetricsQuerier) QueryPodsWithReplicaSetOwner(start, end time.Time) *source.Future[source.PodsWithReplicaSetOwnerResult] {
  1739. const queryName = "QueryPodsWithReplicaSetOwner"
  1740. const queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet", %s}[%s])) by (pod, owner_name, namespace, uid, %s)`
  1741. cfg := pds.promConfig
  1742. durStr := timeutil.DurationString(end.Sub(start))
  1743. if durStr == "" {
  1744. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1745. }
  1746. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1747. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryPodsWithReplicaSetOwner)
  1748. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1749. return source.NewFuture(source.DecodePodsWithReplicaSetOwnerResult, ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end))
  1750. }
  1751. func (pds *PrometheusMetricsQuerier) QueryReplicaSetsWithoutOwners(start, end time.Time) *source.Future[source.ReplicaSetsWithoutOwnersResult] {
  1752. const queryName = "QueryReplicaSetsWithoutOwners"
  1753. const queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>", %s}[%s])) by (replicaset, namespace, uid, %s)`
  1754. cfg := pds.promConfig
  1755. durStr := timeutil.DurationString(end.Sub(start))
  1756. if durStr == "" {
  1757. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1758. }
  1759. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1760. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetsWithoutOwners)
  1761. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1762. return source.NewFuture(source.DecodeReplicaSetsWithoutOwnersResult, ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end))
  1763. }
  1764. func (pds *PrometheusMetricsQuerier) QueryReplicaSetsWithRollout(start, end time.Time) *source.Future[source.ReplicaSetsWithRolloutResult] {
  1765. const queryName = "QueryReplicaSetsWithRollout"
  1766. const queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout", %s}[%s])) by (replicaset, namespace, owner_kind, owner_name, uid, %s)`
  1767. cfg := pds.promConfig
  1768. durStr := timeutil.DurationString(end.Sub(start))
  1769. if durStr == "" {
  1770. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1771. }
  1772. queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1773. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryReplicaSetsWithRolloutOwner)
  1774. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1775. return source.NewFuture(source.DecodeReplicaSetsWithRolloutResult, ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end))
  1776. }
  1777. // Note: The ResourceQuota metrics are _not_ emitted at the moment. Leaving the query implementations here in case we add metric emission later on.
  1778. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaInfo(start, end time.Time) *source.Future[source.ResourceQuotaInfoResult] {
  1779. const queryName = "QueryResourceQuotaInfo"
  1780. const queryFmtResourceQuotaInfo = `avg(avg_over_time(resourcequota_info{%s}[%s])) by (%s, uid, namespace_uid, resourcequota)`
  1781. cfg := pds.promConfig
  1782. durStr := timeutil.DurationString(end.Sub(start))
  1783. if durStr == "" {
  1784. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1785. }
  1786. queryResourceQuotaInfo := fmt.Sprintf(queryFmtResourceQuotaInfo, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1787. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaInfo)
  1788. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1789. return source.NewFuture(source.DecodeResourceQuotaInfoResult, ctx.QueryAtTime(queryResourceQuotaInfo, end))
  1790. }
  1791. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaUptime(start, end time.Time) *source.Future[source.UptimeResult] {
  1792. const queryName = "QueryResourceQuotaUptime"
  1793. const queryFmtResourceQuotaUptime = `avg(resourcequota_info{%s}) by (%s, uid)[%s:%dm]`
  1794. cfg := pds.promConfig
  1795. minsPerResolution := cfg.DataResolutionMinutes
  1796. durStr := pds.durationStringFor(start, end, minsPerResolution, false)
  1797. if durStr == "" {
  1798. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1799. }
  1800. queryResourceQuotaUptime := fmt.Sprintf(queryFmtResourceQuotaUptime, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  1801. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryFmtResourceQuotaUptime)
  1802. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1803. return source.NewFuture(source.DecodeUptimeResult, ctx.QueryAtTime(queryResourceQuotaUptime, end))
  1804. }
  1805. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecCPURequestAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1806. const queryName = "QueryResourceQuotaSpecCPURequestAverage"
  1807. const queryFmtResourceQuotaSpecCPURequests = `avg(avg_over_time(resourcequota_spec_resource_requests{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1808. cfg := pds.promConfig
  1809. durStr := timeutil.DurationString(end.Sub(start))
  1810. if durStr == "" {
  1811. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1812. }
  1813. queryResourceQuotaSpecCPURequests := fmt.Sprintf(queryFmtResourceQuotaSpecCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1814. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecCPURequests)
  1815. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1816. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecCPURequests, end))
  1817. }
  1818. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecCPURequestMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1819. const queryName = "QueryResourceQuotaSpecCPURequestMax"
  1820. const queryFmtResourceQuotaSpecCPURequests = `max(max_over_time(resourcequota_spec_resource_requests{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1821. cfg := pds.promConfig
  1822. durStr := timeutil.DurationString(end.Sub(start))
  1823. if durStr == "" {
  1824. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1825. }
  1826. queryResourceQuotaSpecCPURequests := fmt.Sprintf(queryFmtResourceQuotaSpecCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1827. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecCPURequests)
  1828. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1829. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecCPURequests, end))
  1830. }
  1831. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecRAMRequestAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1832. const queryName = "QueryResourceQuotaSpecRAMRequestAverage"
  1833. const queryFmtResourceQuotaSpecRAMRequests = `avg(avg_over_time(resourcequota_spec_resource_requests{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1834. cfg := pds.promConfig
  1835. durStr := timeutil.DurationString(end.Sub(start))
  1836. if durStr == "" {
  1837. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1838. }
  1839. queryResourceQuotaSpecRAMRequests := fmt.Sprintf(queryFmtResourceQuotaSpecRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1840. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecRAMRequests)
  1841. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1842. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecRAMRequests, end))
  1843. }
  1844. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecRAMRequestMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1845. const queryName = "QueryResourceQuotaSpecRAMRequestMax"
  1846. const queryFmtResourceQuotaSpecRAMRequests = `max(max_over_time(resourcequota_spec_resource_requests{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1847. cfg := pds.promConfig
  1848. durStr := timeutil.DurationString(end.Sub(start))
  1849. if durStr == "" {
  1850. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1851. }
  1852. queryResourceQuotaSpecRAMRequests := fmt.Sprintf(queryFmtResourceQuotaSpecRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1853. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecRAMRequests)
  1854. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1855. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecRAMRequests, end))
  1856. }
  1857. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecCPULimitAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1858. const queryName = "QueryResourceQuotaSpecCPULimitAverage"
  1859. const queryFmtResourceQuotaSpecCPULimits = `avg(avg_over_time(resourcequota_spec_resource_limits{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1860. cfg := pds.promConfig
  1861. durStr := timeutil.DurationString(end.Sub(start))
  1862. if durStr == "" {
  1863. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1864. }
  1865. queryResourceQuotaSpecCPULimits := fmt.Sprintf(queryFmtResourceQuotaSpecCPULimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1866. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecCPULimits)
  1867. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1868. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecCPULimits, end))
  1869. }
  1870. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecCPULimitMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1871. const queryName = "QueryResourceQuotaSpecCPULimitMax"
  1872. const queryFmtResourceQuotaSpecCPULimits = `max(max_over_time(resourcequota_spec_resource_limits{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1873. cfg := pds.promConfig
  1874. durStr := timeutil.DurationString(end.Sub(start))
  1875. if durStr == "" {
  1876. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1877. }
  1878. queryResourceQuotaSpecCPULimits := fmt.Sprintf(queryFmtResourceQuotaSpecCPULimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1879. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecCPULimits)
  1880. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1881. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecCPULimits, end))
  1882. }
  1883. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecRAMLimitAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1884. const queryName = "QueryResourceQuotaSpecRAMLimitAverage"
  1885. const queryFmtResourceQuotaSpecRAMLimits = `avg(avg_over_time(resourcequota_spec_resource_limits{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1886. cfg := pds.promConfig
  1887. durStr := timeutil.DurationString(end.Sub(start))
  1888. if durStr == "" {
  1889. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1890. }
  1891. queryResourceQuotaSpecRAMLimits := fmt.Sprintf(queryFmtResourceQuotaSpecRAMLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1892. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecRAMLimits)
  1893. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1894. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecRAMLimits, end))
  1895. }
  1896. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaSpecRAMLimitMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1897. const queryName = "QueryResourceQuotaSpecRAMLimitMax"
  1898. const queryFmtResourceQuotaSpecRAMLimits = `max(max_over_time(resourcequota_spec_resource_limits{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1899. cfg := pds.promConfig
  1900. durStr := timeutil.DurationString(end.Sub(start))
  1901. if durStr == "" {
  1902. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1903. }
  1904. queryResourceQuotaSpecRAMLimits := fmt.Sprintf(queryFmtResourceQuotaSpecRAMLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1905. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaSpecRAMLimits)
  1906. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1907. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaSpecRAMLimits, end))
  1908. }
  1909. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedCPURequestAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1910. const queryName = "QueryResourceQuotaStatusUsedCPURequestAverage"
  1911. const queryFmtResourceQuotaStatusUsedCPURequests = `avg(avg_over_time(resourcequota_status_used_resource_requests{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1912. cfg := pds.promConfig
  1913. durStr := timeutil.DurationString(end.Sub(start))
  1914. if durStr == "" {
  1915. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1916. }
  1917. queryResourceQuotaStatusUsedCPURequests := fmt.Sprintf(queryFmtResourceQuotaStatusUsedCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1918. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedCPURequests)
  1919. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1920. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedCPURequests, end))
  1921. }
  1922. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedCPURequestMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1923. const queryName = "QueryResourceQuotaStatusUsedCPURequestMax"
  1924. const queryFmtResourceQuotaStatusUsedCPURequests = `max(max_over_time(resourcequota_status_used_resource_requests{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1925. cfg := pds.promConfig
  1926. durStr := timeutil.DurationString(end.Sub(start))
  1927. if durStr == "" {
  1928. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1929. }
  1930. queryResourceQuotaStatusUsedCPURequests := fmt.Sprintf(queryFmtResourceQuotaStatusUsedCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1931. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedCPURequests)
  1932. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1933. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedCPURequests, end))
  1934. }
  1935. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedRAMRequestAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1936. const queryName = "QueryResourceQuotaStatusUsedRAMRequestAverage"
  1937. const queryFmtResourceQuotaStatusUsedRAMRequests = `avg(avg_over_time(resourcequota_status_used_resource_requests{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1938. cfg := pds.promConfig
  1939. durStr := timeutil.DurationString(end.Sub(start))
  1940. if durStr == "" {
  1941. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1942. }
  1943. queryResourceQuotaStatusUsedRAMRequests := fmt.Sprintf(queryFmtResourceQuotaStatusUsedRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1944. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedRAMRequests)
  1945. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1946. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedRAMRequests, end))
  1947. }
  1948. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedRAMRequestMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1949. const queryName = "QueryResourceQuotaStatusUsedRAMRequestMax"
  1950. const queryFmtResourceQuotaStatusUsedRAMRequests = `max(max_over_time(resourcequota_status_used_resource_requests{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1951. cfg := pds.promConfig
  1952. durStr := timeutil.DurationString(end.Sub(start))
  1953. if durStr == "" {
  1954. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1955. }
  1956. queryResourceQuotaStatusUsedRAMRequests := fmt.Sprintf(queryFmtResourceQuotaStatusUsedRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1957. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedRAMRequests)
  1958. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1959. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedRAMRequests, end))
  1960. }
  1961. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedCPULimitAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1962. const queryName = "QueryResourceQuotaStatusUsedCPULimitAverage"
  1963. const queryFmtResourceQuotaStatusUsedCPULimits = `avg(avg_over_time(resourcequota_status_used_resource_limits{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1964. cfg := pds.promConfig
  1965. durStr := timeutil.DurationString(end.Sub(start))
  1966. if durStr == "" {
  1967. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1968. }
  1969. queryResourceQuotaStatusUsedCPULimits := fmt.Sprintf(queryFmtResourceQuotaStatusUsedCPULimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1970. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedCPULimits)
  1971. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1972. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedCPULimits, end))
  1973. }
  1974. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedCPULimitMax(start, end time.Time) *source.Future[source.ResourceResult] {
  1975. const queryName = "QueryResourceQuotaStatusUsedCPULimitMax"
  1976. const queryFmtResourceQuotaStatusUsedCPULimits = `max(max_over_time(resourcequota_status_used_resource_limits{resource="cpu",unit="core", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1977. cfg := pds.promConfig
  1978. durStr := timeutil.DurationString(end.Sub(start))
  1979. if durStr == "" {
  1980. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1981. }
  1982. queryResourceQuotaStatusUsedCPULimits := fmt.Sprintf(queryFmtResourceQuotaStatusUsedCPULimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1983. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedCPULimits)
  1984. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1985. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedCPULimits, end))
  1986. }
  1987. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedRAMLimitAverage(start, end time.Time) *source.Future[source.ResourceResult] {
  1988. const queryName = "QueryResourceQuotaStatusUsedRAMLimitAverage"
  1989. const queryFmtResourceQuotaStatusUsedRAMLimits = `avg(avg_over_time(resourcequota_status_used_resource_limits{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  1990. cfg := pds.promConfig
  1991. durStr := timeutil.DurationString(end.Sub(start))
  1992. if durStr == "" {
  1993. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  1994. }
  1995. queryResourceQuotaStatusUsedRAMLimits := fmt.Sprintf(queryFmtResourceQuotaStatusUsedRAMLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1996. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedRAMLimits)
  1997. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  1998. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedRAMLimits, end))
  1999. }
  2000. func (pds *PrometheusMetricsQuerier) QueryResourceQuotaStatusUsedRAMLimitMax(start, end time.Time) *source.Future[source.ResourceResult] {
  2001. const queryName = "QueryResourceQuotaStatusUsedRAMLimitMax"
  2002. const queryFmtResourceQuotaStatusUsedRAMLimits = `max(max_over_time(resourcequota_status_used_resource_limits{resource="memory",unit="byte", %s}[%s])) by (resourcequota, namespace, uid, %s)`
  2003. cfg := pds.promConfig
  2004. durStr := timeutil.DurationString(end.Sub(start))
  2005. if durStr == "" {
  2006. panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
  2007. }
  2008. queryResourceQuotaStatusUsedRAMLimits := fmt.Sprintf(queryFmtResourceQuotaStatusUsedRAMLimits, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  2009. log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryResourceQuotaStatusUsedRAMLimits)
  2010. ctx := pds.promContexts.NewNamedContext(KubeModelContextName)
  2011. return source.NewFuture(source.DecodeResourceResult, ctx.QueryAtTime(queryResourceQuotaStatusUsedRAMLimits, end))
  2012. }
  2013. func (pds *PrometheusMetricsQuerier) QueryDataCoverage(limitDays int) (time.Time, time.Time, error) {
  2014. const (
  2015. queryName = "QueryDataCoverage"
  2016. queryFmtOldestSample = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  2017. queryFmtNewestSample = `max_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  2018. )
  2019. cfg := pds.promConfig
  2020. minutesPerDuration := 60
  2021. dur := time.Duration(limitDays) * timeutil.Day
  2022. end := time.Now().UTC().Truncate(timeutil.Day).Add(timeutil.Day)
  2023. start := end.Add(-dur)
  2024. durStr := pds.durationStringFor(start, end, minutesPerDuration, false)
  2025. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  2026. queryOldest := fmt.Sprintf(queryFmtOldestSample, cfg.ClusterFilter, durStr, "1h")
  2027. log.Debugf("[Prometheus][%s[Oldest]][At Time: %d]: %s", queryName, end.Unix(), queryOldest)
  2028. resOldestFut := ctx.QueryAtTime(queryOldest, end)
  2029. resOldest, err := resOldestFut.Await()
  2030. if err != nil {
  2031. return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
  2032. }
  2033. if len(resOldest) == 0 || len(resOldest[0].Values) == 0 {
  2034. // If node_cpu_hourly_cost metric is not available, fallback to a reasonable time range
  2035. // This prevents CSV export from failing when the metric doesn't exist yet
  2036. log.Warnf("QueryDataCoverage: node_cpu_hourly_cost metric not available, using fallback time range")
  2037. // Use a reasonable fallback: start from 1 day ago to account for metric collection delay
  2038. fallbackEnd := time.Now().UTC().Truncate(timeutil.Day)
  2039. fallbackStart := fallbackEnd.AddDate(0, 0, -1) // 1 day ago
  2040. return fallbackStart, fallbackEnd, nil
  2041. }
  2042. oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
  2043. queryNewest := fmt.Sprintf(queryFmtNewestSample, cfg.ClusterFilter, durStr, "1h")
  2044. log.Debugf("[Prometheus][%s[Newest]][At Time: %d]: %s", queryName, end.Unix(), queryNewest)
  2045. resNewestFut := ctx.QueryAtTime(queryNewest, end)
  2046. resNewest, err := resNewestFut.Await()
  2047. if err != nil {
  2048. return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
  2049. }
  2050. if len(resNewest) == 0 || len(resNewest[0].Values) == 0 {
  2051. // If newest query fails but oldest succeeded, use oldest as both start and end
  2052. // This allows CSV export to proceed with at least some time range
  2053. log.Warnf("QueryDataCoverage: newest sample query returned no results, using oldest timestamp")
  2054. return oldest, oldest, nil
  2055. }
  2056. newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
  2057. return oldest, newest, nil
  2058. }
  2059. // durationStringFor simplifies the determination of query duration based on the version of prom and if the function
  2060. // in the query needs all data points in the vector it is provided or if it will extrapolate its own. Functions
  2061. // that extrapolate will add on another resolution if given a duration that is one resolution longer than the intended
  2062. // duration.
  2063. func (pds *PrometheusMetricsQuerier) durationStringFor(start, end time.Time, minsPerResolution int, extrapolated bool) string {
  2064. dur := end.Sub(start)
  2065. // If using a version of Prometheus where the resolution needs duration offset,
  2066. // we need to apply that here.
  2067. //
  2068. // E.g. avg(node_total_hourly_cost{}) by (node, provider_id)[60m:5m] with
  2069. // time=01:00:00 will return, for a node running the entire time, 12
  2070. // timestamps where the first is 00:05:00 and the last is 01:00:00.
  2071. // However, OpenCost expects for there to be 13 timestamps where the first
  2072. // begins at 00:00:00. To achieve this, we must modify our query to
  2073. // avg(node_total_hourly_cost{}) by (node, provider_id)[65m:5m]
  2074. if pds.promConfig.IsOffsetResolution && !extrapolated {
  2075. // increase the query time by the resolution
  2076. dur = dur + (time.Duration(minsPerResolution) * time.Minute)
  2077. }
  2078. return timeutil.DurationString(dur)
  2079. }