datasource.go 95 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "net/http"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/julienschmidt/httprouter"
  11. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  12. "github.com/opencost/opencost/core/pkg/clusters"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/protocol"
  15. "github.com/opencost/opencost/core/pkg/source"
  16. "github.com/opencost/opencost/core/pkg/util/httputil"
  17. "github.com/opencost/opencost/core/pkg/util/json"
  18. "github.com/opencost/opencost/core/pkg/util/timeutil"
  19. prometheus "github.com/prometheus/client_golang/api"
  20. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  21. )
  22. const (
  23. apiPrefix = "/api/v1"
  24. epAlertManagers = apiPrefix + "/alertmanagers"
  25. epLabelValues = apiPrefix + "/label/:name/values"
  26. epSeries = apiPrefix + "/series"
  27. epTargets = apiPrefix + "/targets"
  28. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  29. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  30. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  31. epConfig = apiPrefix + "/status/config"
  32. epFlags = apiPrefix + "/status/flags"
  33. epRules = apiPrefix + "/rules"
  34. )
  35. // helper for query range proxy requests
  36. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  37. var e error
  38. ss := qp.Get("start", "")
  39. es := qp.Get("end", "")
  40. ds := qp.Get("duration", "")
  41. layout := "2006-01-02T15:04:05.000Z"
  42. start, e = time.Parse(layout, ss)
  43. if e != nil {
  44. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  45. return
  46. }
  47. end, e = time.Parse(layout, es)
  48. if e != nil {
  49. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  50. return
  51. }
  52. step, e = time.ParseDuration(ds)
  53. if e != nil {
  54. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  55. return
  56. }
  57. err = nil
  58. return
  59. }
  60. // FIXME: Before merge, implement a more robust design. This is brittle and bug-prone,
  61. // FIXME: but decouples the prom requirements from the Provider implementations.
  62. var providerStorageQueries = map[string]func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string{
  63. "aws": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  64. return ""
  65. },
  66. "gcp": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  67. // TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
  68. // See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
  69. localStorageCost := 0.04
  70. baseMetric := "container_fs_limit_bytes"
  71. if used {
  72. baseMetric = "container_fs_usage_bytes"
  73. }
  74. fmtCumulativeQuery := `sum(
  75. sum_over_time(%s{device!="tmpfs", id="/", %s}[%s:1m])
  76. ) by (%s) / 60 / 730 / 1024 / 1024 / 1024 * %f`
  77. fmtMonthlyQuery := `sum(
  78. avg_over_time(%s{device!="tmpfs", id="/", %s}[%s:1m])
  79. ) by (%s) / 1024 / 1024 / 1024 * %f`
  80. fmtQuery := fmtCumulativeQuery
  81. if rate {
  82. fmtQuery = fmtMonthlyQuery
  83. }
  84. fmtWindow := timeutil.DurationString(end.Sub(start))
  85. return fmt.Sprintf(fmtQuery, baseMetric, config.ClusterFilter, fmtWindow, config.ClusterLabel, localStorageCost)
  86. },
  87. "azure": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  88. return ""
  89. },
  90. "alibaba": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  91. return ""
  92. },
  93. "scaleway": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  94. return ""
  95. },
  96. "otc": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  97. return ""
  98. },
  99. "oracle": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  100. return ""
  101. },
  102. "csv": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  103. return ""
  104. },
  105. "custom": func(config *OpenCostPrometheusConfig, start, end time.Time, rate bool, used bool) string {
  106. return ""
  107. },
  108. }
  109. // creates a new help error which indicates the caller can retry and is non-fatal.
  110. func newHelpRetryError(format string, args ...any) error {
  111. formatWithHelp := format + "\nTroubleshooting help available at: %s"
  112. args = append(args, PrometheusTroubleshootingURL)
  113. cause := fmt.Errorf(formatWithHelp, args...)
  114. return source.NewHelpRetryError(cause)
  115. }
  116. // PrometheusDataSource is the OpenCost data source implementation leveraging Prometheus. Prometheus provides longer retention periods and
  117. // more detailed metrics than the OpenCost Collector, which is useful for historical analysis and cost forecasting.
  118. type PrometheusDataSource struct {
  119. promConfig *OpenCostPrometheusConfig
  120. promClient prometheus.Client
  121. promContexts *ContextFactory
  122. thanosConfig *OpenCostThanosConfig
  123. thanosClient prometheus.Client
  124. thanosContexts *ContextFactory
  125. }
  126. // NewDefaultPrometheusDataSource creates and initializes a new `PrometheusDataSource` with configuration
  127. // parsed from environment variables. This function will block until a connection to prometheus is established,
  128. // or fails. It is recommended to run this function in a goroutine on a retry cycle.
  129. func NewDefaultPrometheusDataSource() (*PrometheusDataSource, error) {
  130. config, err := NewOpenCostPrometheusConfigFromEnv()
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to create prometheus config from env: %w", err)
  133. }
  134. var thanosConfig *OpenCostThanosConfig
  135. if env.IsThanosEnabled() {
  136. // thanos initialization is not fatal, so we log the error and continue
  137. thanosConfig, err = NewOpenCostThanosConfigFromEnv()
  138. if err != nil {
  139. log.Warnf("Thanos was enabled, but failed to create thanos config from env: %s. Continuing...", err.Error())
  140. }
  141. }
  142. return NewPrometheusDataSource(config, thanosConfig)
  143. }
  144. // NewPrometheusDataSource initializes clients for Prometheus and Thanos, and returns a new PrometheusDataSource.
  145. func NewPrometheusDataSource(promConfig *OpenCostPrometheusConfig, thanosConfig *OpenCostThanosConfig) (*PrometheusDataSource, error) {
  146. promClient, err := NewPrometheusClient(promConfig.ServerEndpoint, promConfig.ClientConfig)
  147. if err != nil {
  148. return nil, fmt.Errorf("failed to build prometheus client: %w", err)
  149. }
  150. // validation of the prometheus client
  151. m, err := Validate(promClient, promConfig)
  152. if err != nil || !m.Running {
  153. if err != nil {
  154. return nil, newHelpRetryError("failed to query prometheus at %s: %w", promConfig.ServerEndpoint, err)
  155. } else if !m.Running {
  156. return nil, newHelpRetryError("prometheus at %s is not running", promConfig.ServerEndpoint)
  157. }
  158. } else {
  159. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", promConfig.ServerEndpoint)
  160. }
  161. // we don't consider this a fatal error, but we log for visibility
  162. api := prometheusAPI.NewAPI(promClient)
  163. _, err = api.Buildinfo(context.Background())
  164. if err != nil {
  165. log.Infof("No valid prometheus config file at %s. Error: %s.\nTroubleshooting help available at: %s.\n**Ignore if using cortex/mimir/thanos here**", promConfig.ServerEndpoint, err.Error(), PrometheusTroubleshootingURL)
  166. } else {
  167. log.Infof("Retrieved a prometheus config file from: %s", promConfig.ServerEndpoint)
  168. }
  169. // Fix scrape interval if zero by attempting to lookup the interval for the configured job
  170. if promConfig.ScrapeInterval == 0 {
  171. promConfig.ScrapeInterval = time.Minute
  172. // Lookup scrape interval for kubecost job, update if found
  173. si, err := ScrapeIntervalFor(promClient, promConfig.JobName)
  174. if err == nil {
  175. promConfig.ScrapeInterval = si
  176. }
  177. }
  178. log.Infof("Using scrape interval of %f", promConfig.ScrapeInterval.Seconds())
  179. promContexts := NewContextFactory(promClient, promConfig)
  180. var thanosClient prometheus.Client
  181. var thanosContexts *ContextFactory
  182. // if the thanos configuration is non-nil, we assume intent to use thanos. However, failure to
  183. // initialize the thanos client is not fatal, and we will log the error and continue.
  184. if thanosConfig != nil {
  185. thanosHost := thanosConfig.ServerEndpoint
  186. if thanosHost != "" {
  187. thanosCli, _ := NewThanosClient(thanosHost, thanosConfig)
  188. _, err = Validate(thanosCli, thanosConfig.OpenCostPrometheusConfig)
  189. if err != nil {
  190. log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosHost, err.Error())
  191. thanosClient = thanosCli
  192. } else {
  193. log.Infof("Success: retrieved the 'up' query against Thanos at: %s", thanosHost)
  194. thanosClient = thanosCli
  195. }
  196. thanosContexts = NewContextFactory(thanosClient, thanosConfig.OpenCostPrometheusConfig)
  197. } else {
  198. log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  199. }
  200. }
  201. return &PrometheusDataSource{
  202. promConfig: promConfig,
  203. promClient: promClient,
  204. promContexts: promContexts,
  205. thanosConfig: thanosConfig,
  206. thanosClient: thanosClient,
  207. thanosContexts: thanosContexts,
  208. }, nil
  209. }
  210. var proto = protocol.HTTP()
  211. // prometheusMetadata returns the metadata for the prometheus server
  212. func (pds *PrometheusDataSource) prometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  213. w.Header().Set("Content-Type", "application/json")
  214. w.Header().Set("Access-Control-Allow-Origin", "*")
  215. resp := proto.ToResponse(Validate(pds.promClient, pds.promConfig))
  216. proto.WriteResponse(w, resp)
  217. }
  218. // prometheusRecordingRules is a proxy for /rules against prometheus
  219. func (pds *PrometheusDataSource) prometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  220. w.Header().Set("Content-Type", "application/json")
  221. w.Header().Set("Access-Control-Allow-Origin", "*")
  222. u := pds.promClient.URL(epRules, nil)
  223. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  224. if err != nil {
  225. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  226. return
  227. }
  228. _, body, err := pds.promClient.Do(r.Context(), req)
  229. if err != nil {
  230. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  231. return
  232. }
  233. w.Write(body)
  234. }
  235. // prometheusConfig returns the current configuration of the prometheus server
  236. func (pds *PrometheusDataSource) prometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  237. w.Header().Set("Content-Type", "application/json")
  238. w.Header().Set("Access-Control-Allow-Origin", "*")
  239. pConfig := map[string]string{
  240. "address": pds.promConfig.ServerEndpoint,
  241. }
  242. body, err := json.Marshal(pConfig)
  243. if err != nil {
  244. fmt.Fprintf(w, "Error marshalling prometheus config")
  245. } else {
  246. w.Write(body)
  247. }
  248. }
  249. // prometheusTargets is a proxy for /targets against prometheus
  250. func (pds *PrometheusDataSource) prometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  251. w.Header().Set("Content-Type", "application/json")
  252. w.Header().Set("Access-Control-Allow-Origin", "*")
  253. u := pds.promClient.URL(epTargets, nil)
  254. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  255. if err != nil {
  256. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  257. return
  258. }
  259. _, body, err := pds.promClient.Do(r.Context(), req)
  260. if err != nil {
  261. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  262. return
  263. }
  264. w.Write(body)
  265. }
  266. // status returns the status of the prometheus client
  267. func (pds *PrometheusDataSource) status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  268. w.Header().Set("Content-Type", "application/json")
  269. w.Header().Set("Access-Control-Allow-Origin", "*")
  270. promServer := pds.promConfig.ServerEndpoint
  271. api := prometheusAPI.NewAPI(pds.promClient)
  272. result, err := api.Buildinfo(r.Context())
  273. if err != nil {
  274. fmt.Fprintf(w, "Using Prometheus at %s, Error: %s", promServer, err)
  275. } else {
  276. fmt.Fprintf(w, "Using Prometheus at %s, version: %s", promServer, result.Version)
  277. }
  278. }
  279. // prometheusQuery is a proxy for /query against prometheus
  280. func (pds *PrometheusDataSource) prometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  281. w.Header().Set("Content-Type", "application/json")
  282. w.Header().Set("Access-Control-Allow-Origin", "*")
  283. qp := httputil.NewQueryParams(r.URL.Query())
  284. query := qp.Get("query", "")
  285. if query == "" {
  286. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  287. return
  288. }
  289. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  290. var timeVal time.Time
  291. timeStr := qp.Get("time", "")
  292. if len(timeStr) > 0 {
  293. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  294. timeVal = time.Unix(t, 0)
  295. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  296. timeVal = t
  297. }
  298. // If time is given, but not parse-able, return an error
  299. if timeVal.IsZero() {
  300. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  301. }
  302. }
  303. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  304. body, err := ctx.RawQuery(query, timeVal)
  305. if err != nil {
  306. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  307. return
  308. }
  309. w.Write(body) // prometheusQueryRange is a proxy for /query_range against prometheus
  310. }
  311. func (pds *PrometheusDataSource) prometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  312. w.Header().Set("Content-Type", "application/json")
  313. w.Header().Set("Access-Control-Allow-Origin", "*")
  314. qp := httputil.NewQueryParams(r.URL.Query())
  315. query := qp.Get("query", "")
  316. if query == "" {
  317. fmt.Fprintf(w, "Error parsing query from request parameters.")
  318. return
  319. }
  320. start, end, duration, err := toStartEndStep(qp)
  321. if err != nil {
  322. fmt.Fprintf(w, "error: %s", err)
  323. return
  324. }
  325. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  326. body, err := ctx.RawQueryRange(query, start, end, duration)
  327. if err != nil {
  328. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  329. return
  330. }
  331. w.Write(body)
  332. }
  333. // thanosQuery is a proxy for /query against thanos
  334. func (pds *PrometheusDataSource) thanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  335. w.Header().Set("Content-Type", "application/json")
  336. w.Header().Set("Access-Control-Allow-Origin", "*")
  337. if pds.thanosClient == nil {
  338. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  339. return
  340. }
  341. qp := httputil.NewQueryParams(r.URL.Query())
  342. query := qp.Get("query", "")
  343. if query == "" {
  344. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  345. return
  346. }
  347. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  348. var timeVal time.Time
  349. timeStr := qp.Get("time", "")
  350. if len(timeStr) > 0 {
  351. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  352. timeVal = time.Unix(t, 0)
  353. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  354. timeVal = t
  355. }
  356. // If time is given, but not parse-able, return an error
  357. if timeVal.IsZero() {
  358. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  359. }
  360. }
  361. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  362. body, err := ctx.RawQuery(query, timeVal)
  363. if err != nil {
  364. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  365. return
  366. }
  367. w.Write(body)
  368. }
  369. // thanosQueryRange is a proxy for /query_range against thanos
  370. func (pds *PrometheusDataSource) thanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  371. w.Header().Set("Content-Type", "application/json")
  372. w.Header().Set("Access-Control-Allow-Origin", "*")
  373. if pds.thanosClient == nil {
  374. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  375. return
  376. }
  377. qp := httputil.NewQueryParams(r.URL.Query())
  378. query := qp.Get("query", "")
  379. if query == "" {
  380. fmt.Fprintf(w, "Error parsing query from request parameters.")
  381. return
  382. }
  383. start, end, duration, err := toStartEndStep(qp)
  384. if err != nil {
  385. fmt.Fprintf(w, "error: %s", err)
  386. return
  387. }
  388. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  389. body, err := ctx.RawQueryRange(query, start, end, duration)
  390. if err != nil {
  391. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  392. return
  393. }
  394. w.Write(body)
  395. }
  396. // promtheusQueueState returns the current state of the prometheus and thanos request queues
  397. func (pds *PrometheusDataSource) prometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  398. w.Header().Set("Content-Type", "application/json")
  399. w.Header().Set("Access-Control-Allow-Origin", "*")
  400. promQueueState, err := GetPrometheusQueueState(pds.promClient, pds.promConfig)
  401. if err != nil {
  402. proto.WriteResponse(w, proto.ToResponse(nil, err))
  403. return
  404. }
  405. result := map[string]*PrometheusQueueState{
  406. "prometheus": promQueueState,
  407. }
  408. if pds.thanosClient != nil {
  409. thanosQueueState, err := GetPrometheusQueueState(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig)
  410. if err != nil {
  411. log.Warnf("Error getting Thanos queue state: %s", err)
  412. } else {
  413. result["thanos"] = thanosQueueState
  414. }
  415. }
  416. proto.WriteResponse(w, proto.ToResponse(result, nil))
  417. }
  418. // prometheusMetrics retrieves availability of Prometheus and Thanos metrics
  419. func (pds *PrometheusDataSource) prometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  420. w.Header().Set("Content-Type", "application/json")
  421. w.Header().Set("Access-Control-Allow-Origin", "*")
  422. promMetrics := GetPrometheusMetrics(pds.promClient, pds.promConfig, "")
  423. result := map[string][]*PrometheusDiagnostic{
  424. "prometheus": promMetrics,
  425. }
  426. if pds.thanosClient != nil {
  427. thanosMetrics := GetPrometheusMetrics(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig, pds.thanosConfig.Offset)
  428. result["thanos"] = thanosMetrics
  429. }
  430. proto.WriteResponse(w, proto.ToResponse(result, nil))
  431. }
  432. func (pds *PrometheusDataSource) PrometheusClient() prometheus.Client {
  433. return pds.promClient
  434. }
  435. func (pds *PrometheusDataSource) PrometheusConfig() *OpenCostPrometheusConfig {
  436. return pds.promConfig
  437. }
  438. func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
  439. return pds.promContexts
  440. }
  441. func (pds *PrometheusDataSource) ThanosClient() prometheus.Client {
  442. return pds.thanosClient
  443. }
  444. func (pds *PrometheusDataSource) ThanosConfig() *OpenCostThanosConfig {
  445. return pds.thanosConfig
  446. }
  447. func (pds *PrometheusDataSource) ThanosContexts() *ContextFactory {
  448. return pds.thanosContexts
  449. }
  450. func (pds *PrometheusDataSource) NewClusterMap(clusterInfoProvider clusters.ClusterInfoProvider) clusters.ClusterMap {
  451. if pds.thanosClient != nil {
  452. return newPrometheusClusterMap(pds.thanosContexts, clusterInfoProvider, 10*time.Minute)
  453. }
  454. return newPrometheusClusterMap(pds.promContexts, clusterInfoProvider, 5*time.Minute)
  455. }
  456. func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
  457. // endpoints migrated from server
  458. router.GET("/validatePrometheus", pds.prometheusMetadata)
  459. router.GET("/prometheusRecordingRules", pds.prometheusRecordingRules)
  460. router.GET("/prometheusConfig", pds.prometheusConfig)
  461. router.GET("/prometheusTargets", pds.prometheusTargets)
  462. router.GET("/status", pds.status)
  463. // prom query proxies
  464. router.GET("/prometheusQuery", pds.prometheusQuery)
  465. router.GET("/prometheusQueryRange", pds.prometheusQueryRange)
  466. router.GET("/thanosQuery", pds.thanosQuery)
  467. router.GET("/thanosQueryRange", pds.thanosQueryRange)
  468. // diagnostics
  469. router.GET("/diagnostics/requestQueue", pds.prometheusQueueState)
  470. router.GET("/diagnostics/prometheusMetrics", pds.prometheusMetrics)
  471. }
  472. func (pds *PrometheusDataSource) RefreshInterval() time.Duration {
  473. return pds.promConfig.ScrapeInterval
  474. }
  475. func (pds *PrometheusDataSource) BatchDuration() time.Duration {
  476. return pds.promConfig.MaxQueryDuration
  477. }
  478. func (pds *PrometheusDataSource) Resolution() time.Duration {
  479. return pds.promConfig.DataResolution
  480. }
  481. func (pds *PrometheusDataSource) MetaData() map[string]string {
  482. thanosEnabled := pds.thanosClient != nil
  483. metadata := map[string]string{
  484. clusters.ClusterInfoThanosEnabledKey: fmt.Sprintf("%t", thanosEnabled),
  485. }
  486. if thanosEnabled {
  487. metadata[clusters.ClusterInfoThanosOffsetKey] = pds.thanosConfig.Offset
  488. }
  489. return metadata
  490. }
  491. //--------------------------------------------------------------------------
  492. // InstantMetricsQuerier
  493. //--------------------------------------------------------------------------
  494. func (pds *PrometheusDataSource) QueryPVCost(start, end time.Time) source.QueryResultsChan {
  495. const pvCostQuery = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`
  496. durStr := timeutil.DurationString(end.Sub(start))
  497. if durStr == "" {
  498. panic("failed to parse duration string passed to QueryPVCost")
  499. }
  500. queryPVCost := fmt.Sprintf(pvCostQuery, pds.promConfig.ClusterFilter, durStr, pds.promConfig.ClusterLabel)
  501. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  502. return ctx.QueryAtTime(queryPVCost, end)
  503. }
  504. func (pds *PrometheusDataSource) QueryPVSize(start, end time.Time) source.QueryResultsChan {
  505. const pvSizeQuery = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`
  506. cfg := pds.promConfig
  507. durStr := timeutil.DurationString(end.Sub(start))
  508. if durStr == "" {
  509. panic("failed to parse duration string passed to QueryPVCost")
  510. }
  511. queryPVSize := fmt.Sprintf(pvSizeQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  512. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  513. return ctx.QueryAtTime(queryPVSize, end)
  514. }
  515. func (pds *PrometheusDataSource) QueryPVStorageClass(start, end time.Time) source.QueryResultsChan {
  516. // `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`
  517. // , env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  518. const pvStorageSizeQuery = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`
  519. cfg := pds.promConfig
  520. durStr := timeutil.DurationString(end.Sub(start))
  521. if durStr == "" {
  522. panic("failed to parse duration string passed to QueryPVStorageClass")
  523. }
  524. queryPVStorageClass := fmt.Sprintf(pvStorageSizeQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  525. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  526. return ctx.QueryAtTime(queryPVStorageClass, end)
  527. }
  528. func (pds *PrometheusDataSource) QueryPVUsedAverage(start, end time.Time) source.QueryResultsChan {
  529. // `avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`
  530. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  531. const pvUsedAverageQuery = `avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`
  532. cfg := pds.promConfig
  533. durStr := timeutil.DurationString(end.Sub(start))
  534. if durStr == "" {
  535. panic("failed to parse duration string passed to QueryPVUsedAverage")
  536. }
  537. queryPVUsedAvg := fmt.Sprintf(pvUsedAverageQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  538. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  539. return ctx.QueryAtTime(queryPVUsedAvg, end)
  540. }
  541. func (pds *PrometheusDataSource) QueryPVUsedMax(start, end time.Time) source.QueryResultsChan {
  542. // `max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`
  543. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  544. const pvUsedMaxQuery = `max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`
  545. cfg := pds.promConfig
  546. durStr := timeutil.DurationString(end.Sub(start))
  547. if durStr == "" {
  548. panic("failed to parse duration string passed to QueryPVUsedMax")
  549. }
  550. queryPVUsedMax := fmt.Sprintf(pvUsedMaxQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  551. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  552. return ctx.QueryAtTime(queryPVUsedMax, end)
  553. }
  554. func (pds *PrometheusDataSource) QueryPVCInfo(start, end time.Time) source.QueryResultsChan {
  555. // `avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`
  556. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  557. const pvcInfoQuery = `avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`
  558. cfg := pds.promConfig
  559. durStr := timeutil.DurationString(end.Sub(start))
  560. if durStr == "" {
  561. panic("failed to parse duration string passed to QueryPVCInfo")
  562. }
  563. queryPVCInfo := fmt.Sprintf(pvcInfoQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  564. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  565. return ctx.QueryAtTime(queryPVCInfo, end)
  566. }
  567. func (pds *PrometheusDataSource) QueryPVActiveMinutes(start, end time.Time) source.QueryResultsChan {
  568. // `avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`
  569. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  570. const pvActiveMinsQuery = `avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`
  571. cfg := pds.promConfig
  572. minsPerResolution := cfg.DataResolutionMinutes
  573. durStr := timeutil.DurationString(end.Sub(start))
  574. if durStr == "" {
  575. panic("failed to parse duration string passed to QueryPVActiveMinutes")
  576. }
  577. queryPVActiveMins := fmt.Sprintf(pvActiveMinsQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  578. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  579. return ctx.QueryAtTime(queryPVActiveMins, end)
  580. }
  581. func (pds *PrometheusDataSource) QueryLocalStorageCost(start, end time.Time) source.QueryResultsChan {
  582. // `sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`
  583. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  584. const localStorageCostQuery = `sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`
  585. cfg := pds.promConfig
  586. resolution := cfg.DataResolution
  587. durStr := timeutil.DurationString(end.Sub(start))
  588. if durStr == "" {
  589. panic("failed to parse duration string passed to QueryLocalStorageCost")
  590. }
  591. //Ensuring if data resolution is less than 60s default it to 1m
  592. var minsPerResolution int
  593. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  594. minsPerResolution = 1
  595. log.DedupedWarningf(3, "QueryLocalStorageCost: Configured resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  596. }
  597. // hourlyToCumulative is a scaling factor that, when multiplied by an
  598. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  599. // [min/res]*[hr/min] = [$/res]
  600. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  601. costPerGBHr := 0.04 / 730.0
  602. queryLocalStorageCost := fmt.Sprintf(localStorageCostQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  603. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  604. return ctx.QueryAtTime(queryLocalStorageCost, end)
  605. }
  606. func (pds *PrometheusDataSource) QueryLocalStorageUsedCost(start, end time.Time) source.QueryResultsChan {
  607. // `sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`
  608. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  609. const localStorageUsedCostQuery = `sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`
  610. cfg := pds.promConfig
  611. minsPerResolution := cfg.DataResolutionMinutes
  612. durStr := timeutil.DurationString(end.Sub(start))
  613. if durStr == "" {
  614. panic("failed to parse duration string passed to QueryLocalStorageUsedCost")
  615. }
  616. // hourlyToCumulative is a scaling factor that, when multiplied by an
  617. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  618. // [min/res]*[hr/min] = [$/res]
  619. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  620. costPerGBHr := 0.04 / 730.0
  621. queryLocalStorageUsedCost := fmt.Sprintf(localStorageUsedCostQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  622. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  623. return ctx.QueryAtTime(queryLocalStorageUsedCost, end)
  624. }
  625. func (pds *PrometheusDataSource) QueryLocalStorageUsedAvg(start, end time.Time) source.QueryResultsChan {
  626. // `avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  627. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  628. const localStorageUsedAvgQuery = `avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  629. cfg := pds.promConfig
  630. durStr := timeutil.DurationString(end.Sub(start))
  631. if durStr == "" {
  632. panic("failed to parse duration string passed to QueryLocalStorageUsedAvg")
  633. }
  634. queryLocalStorageUsedAvg := fmt.Sprintf(localStorageUsedAvgQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  635. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  636. return ctx.QueryAtTime(queryLocalStorageUsedAvg, end)
  637. }
  638. func (pds *PrometheusDataSource) QueryLocalStorageUsedMax(start, end time.Time) source.QueryResultsChan {
  639. // `max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  640. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  641. const localStorageUsedMaxQuery = `max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`
  642. cfg := pds.promConfig
  643. durStr := timeutil.DurationString(end.Sub(start))
  644. if durStr == "" {
  645. panic("failed to parse duration string passed to QueryLocalStorageUsedMax")
  646. }
  647. queryLocalStorageUsedMax := fmt.Sprintf(localStorageUsedMaxQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  648. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  649. return ctx.QueryAtTime(queryLocalStorageUsedMax, end)
  650. }
  651. func (pds *PrometheusDataSource) QueryLocalStorageBytes(start, end time.Time) source.QueryResultsChan {
  652. // `avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`
  653. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  654. const localStorageBytesQuery = `avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`
  655. cfg := pds.promConfig
  656. minsPerResolution := cfg.DataResolutionMinutes
  657. durStr := timeutil.DurationString(end.Sub(start))
  658. if durStr == "" {
  659. panic("failed to parse duration string passed to QueryLocalStorageBytes")
  660. }
  661. queryLocalStorageBytes := fmt.Sprintf(localStorageBytesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  662. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  663. return ctx.QueryAtTime(queryLocalStorageBytes, end)
  664. }
  665. func (pds *PrometheusDataSource) QueryLocalStorageActiveMinutes(start, end time.Time) source.QueryResultsChan {
  666. // `count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`
  667. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  668. const localStorageActiveMinutesQuery = `count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`
  669. cfg := pds.promConfig
  670. minsPerResolution := cfg.DataResolutionMinutes
  671. durStr := timeutil.DurationString(end.Sub(start))
  672. if durStr == "" {
  673. panic("failed to parse duration string passed to QueryLocalStorageActiveMinutes")
  674. }
  675. queryLocalStorageActiveMins := fmt.Sprintf(localStorageActiveMinutesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  676. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  677. return ctx.QueryAtTime(queryLocalStorageActiveMins, end)
  678. }
  679. func (pds *PrometheusDataSource) QueryLocalStorageBytesByProvider(provider string, start, end time.Time) source.QueryResultsChan {
  680. var localStorageBytesQuery string
  681. key := strings.ToLower(provider)
  682. if f, ok := providerStorageQueries[key]; ok {
  683. localStorageBytesQuery = f(pds.promConfig, start, end, false, false)
  684. } else {
  685. localStorageBytesQuery = ""
  686. }
  687. if localStorageBytesQuery == "" {
  688. return newEmptyResult()
  689. }
  690. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  691. return ctx.QueryAtTime(localStorageBytesQuery, end)
  692. }
  693. func (pds *PrometheusDataSource) QueryLocalStorageUsedByProvider(provider string, start, end time.Time) source.QueryResultsChan {
  694. var localStorageUsedQuery string
  695. key := strings.ToLower(provider)
  696. if f, ok := providerStorageQueries[key]; ok {
  697. localStorageUsedQuery = f(pds.promConfig, start, end, false, true)
  698. } else {
  699. localStorageUsedQuery = ""
  700. }
  701. if localStorageUsedQuery == "" {
  702. return newEmptyResult()
  703. }
  704. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  705. return ctx.QueryAtTime(localStorageUsedQuery, end)
  706. }
  707. func (pds *PrometheusDataSource) QueryNodeCPUHourlyCost(start, end time.Time) source.QueryResultsChan {
  708. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  709. const nodeCPUHourlyCostQuery = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`
  710. cfg := pds.promConfig
  711. durStr := timeutil.DurationString(end.Sub(start))
  712. if durStr == "" {
  713. panic("failed to parse duration string passed to QueryNodeCPUHourlyCost")
  714. }
  715. queryNodeCPUHourlyCost := fmt.Sprintf(nodeCPUHourlyCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  716. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  717. return ctx.QueryAtTime(queryNodeCPUHourlyCost, end)
  718. }
  719. func (pds *PrometheusDataSource) QueryNodeCPUCoresCapacity(start, end time.Time) source.QueryResultsChan {
  720. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  721. const nodeCPUCoresCapacityQuery = `avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`
  722. cfg := pds.promConfig
  723. durStr := timeutil.DurationString(end.Sub(start))
  724. if durStr == "" {
  725. panic("failed to parse duration string passed to QueryNodeCPUCoresCapacity")
  726. }
  727. queryNodeCPUCoresCapacity := fmt.Sprintf(nodeCPUCoresCapacityQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  728. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  729. return ctx.QueryAtTime(queryNodeCPUCoresCapacity, end)
  730. }
  731. func (pds *PrometheusDataSource) QueryNodeCPUCoresAllocatable(start, end time.Time) source.QueryResultsChan {
  732. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  733. const nodeCPUCoresAllocatableQuery = `avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`
  734. cfg := pds.promConfig
  735. durStr := timeutil.DurationString(end.Sub(start))
  736. if durStr == "" {
  737. panic("failed to parse duration string passed to QueryNodeCPUCoresAllocatable")
  738. }
  739. queryNodeCPUCoresAllocatable := fmt.Sprintf(nodeCPUCoresAllocatableQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  740. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  741. return ctx.QueryAtTime(queryNodeCPUCoresAllocatable, end)
  742. }
  743. func (pds *PrometheusDataSource) QueryNodeRAMHourlyCost(start, end time.Time) source.QueryResultsChan {
  744. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  745. const nodeRAMHourlyCostQuery = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`
  746. cfg := pds.promConfig
  747. durStr := timeutil.DurationString(end.Sub(start))
  748. if durStr == "" {
  749. panic("failed to parse duration string passed to QueryNodeRAMHourlyCost")
  750. }
  751. queryNodeRAMHourlyCost := fmt.Sprintf(nodeRAMHourlyCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  752. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  753. return ctx.QueryAtTime(queryNodeRAMHourlyCost, end)
  754. }
  755. func (pds *PrometheusDataSource) QueryNodeRAMBytesCapacity(start, end time.Time) source.QueryResultsChan {
  756. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  757. const nodeRAMBytesCapacityQuery = `avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`
  758. cfg := pds.promConfig
  759. durStr := timeutil.DurationString(end.Sub(start))
  760. if durStr == "" {
  761. panic("failed to parse duration string passed to QueryNodeRAMBytesCapacity")
  762. }
  763. queryNodeRAMBytesCapacity := fmt.Sprintf(nodeRAMBytesCapacityQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  764. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  765. return ctx.QueryAtTime(queryNodeRAMBytesCapacity, end)
  766. }
  767. func (pds *PrometheusDataSource) QueryNodeRAMBytesAllocatable(start, end time.Time) source.QueryResultsChan {
  768. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  769. const nodeRAMBytesAllocatableQuery = `avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`
  770. cfg := pds.promConfig
  771. durStr := timeutil.DurationString(end.Sub(start))
  772. if durStr == "" {
  773. panic("failed to parse duration string passed to QueryNodeRAMBytesAllocatable")
  774. }
  775. queryNodeRAMBytesAllocatable := fmt.Sprintf(nodeRAMBytesAllocatableQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  776. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  777. return ctx.QueryAtTime(queryNodeRAMBytesAllocatable, end)
  778. }
  779. func (pds *PrometheusDataSource) QueryNodeGPUCount(start, end time.Time) source.QueryResultsChan {
  780. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  781. const nodeGPUCountQuery = `avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`
  782. cfg := pds.promConfig
  783. durStr := timeutil.DurationString(end.Sub(start))
  784. if durStr == "" {
  785. panic("failed to parse duration string passed to QueryNodeGPUCount")
  786. }
  787. queryNodeGPUCount := fmt.Sprintf(nodeGPUCountQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  788. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  789. return ctx.QueryAtTime(queryNodeGPUCount, end)
  790. }
  791. func (pds *PrometheusDataSource) QueryNodeGPUHourlyCost(start, end time.Time) source.QueryResultsChan {
  792. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  793. const nodeGPUHourlyCostQuery = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`
  794. cfg := pds.promConfig
  795. durStr := timeutil.DurationString(end.Sub(start))
  796. if durStr == "" {
  797. panic("failed to parse duration string passed to QueryNodeGPUHourlyCost")
  798. }
  799. queryNodeGPUHourlyCost := fmt.Sprintf(nodeGPUHourlyCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  800. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  801. return ctx.QueryAtTime(queryNodeGPUHourlyCost, end)
  802. }
  803. func (pds *PrometheusDataSource) QueryNodeLabels(start, end time.Time) source.QueryResultsChan {
  804. // env.GetPromClusterFilter(), durStr, minsPerResolution)
  805. const labelsQuery = `count_over_time(kube_node_labels{%s}[%s:%dm])`
  806. cfg := pds.promConfig
  807. minsPerResolution := cfg.DataResolutionMinutes
  808. durStr := timeutil.DurationString(end.Sub(start))
  809. if durStr == "" {
  810. panic("failed to parse duration string passed to QueryNodeLabels")
  811. }
  812. queryLabels := fmt.Sprintf(labelsQuery, cfg.ClusterFilter, durStr, minsPerResolution)
  813. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  814. return ctx.QueryAtTime(queryLabels, end)
  815. }
  816. func (pds *PrometheusDataSource) QueryNodeActiveMinutes(start, end time.Time) source.QueryResultsChan {
  817. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  818. const activeMinsQuery = `avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`
  819. cfg := pds.promConfig
  820. minsPerResolution := cfg.DataResolutionMinutes
  821. durStr := timeutil.DurationString(end.Sub(start))
  822. if durStr == "" {
  823. panic("failed to parse duration string passed to QueryNodeActiveMinutes")
  824. }
  825. queryActiveMins := fmt.Sprintf(activeMinsQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  826. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  827. return ctx.QueryAtTime(queryActiveMins, end)
  828. }
  829. func (pds *PrometheusDataSource) QueryNodeIsSpot(start, end time.Time) source.QueryResultsChan {
  830. // env.GetPromClusterFilter(), durStr, minsPerResolution)
  831. const isSpotQuery = `avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`
  832. cfg := pds.promConfig
  833. minsPerResolution := cfg.DataResolutionMinutes
  834. durStr := timeutil.DurationString(end.Sub(start))
  835. if durStr == "" {
  836. panic("failed to parse duration string passed to QueryNodeIsSpot")
  837. }
  838. queryIsSpot := fmt.Sprintf(isSpotQuery, cfg.ClusterFilter, durStr, minsPerResolution)
  839. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  840. return ctx.QueryAtTime(queryIsSpot, end)
  841. }
  842. func (pds *PrometheusDataSource) QueryNodeCPUModeTotal(start, end time.Time) source.QueryResultsChan {
  843. // env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  844. const nodeCPUModeTotalQuery = `sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`
  845. cfg := pds.promConfig
  846. minsPerResolution := cfg.DataResolutionMinutes
  847. durStr := timeutil.DurationString(end.Sub(start))
  848. if durStr == "" {
  849. panic("failed to parse duration string passed to QueryNodeCPUModeTotal")
  850. }
  851. queryCPUModeTotal := fmt.Sprintf(nodeCPUModeTotalQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
  852. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  853. return ctx.QueryAtTime(queryCPUModeTotal, end)
  854. }
  855. func (pds *PrometheusDataSource) QueryNodeCPUModePercent(start, end time.Time) source.QueryResultsChan {
  856. const fmtQueryCPUModePct = `
  857. sum(rate(node_cpu_seconds_total{%s}[%s])) by (%s, mode) / ignoring(mode)
  858. group_left sum(rate(node_cpu_seconds_total{%s}[%s])) by (%s)
  859. `
  860. // env.GetPromClusterFilter(), windowStr, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel()
  861. cfg := pds.promConfig
  862. durStr := timeutil.DurationString(end.Sub(start))
  863. if durStr == "" {
  864. panic("failed to parse duration string passed to QueryNodeCPUModePercent")
  865. }
  866. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  867. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  868. return ctx.QueryAtTime(queryCPUModePct, end)
  869. }
  870. func (pds *PrometheusDataSource) QueryNodeRAMSystemPercent(start, end time.Time) source.QueryResultsChan {
  871. // env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  872. const nodeRAMSystemPctQuery = `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`
  873. cfg := pds.promConfig
  874. minsPerResolution := cfg.DataResolutionMinutes
  875. durStr := timeutil.DurationString(end.Sub(start))
  876. if durStr == "" {
  877. panic("failed to parse duration string passed to QueryNodeRAMSystemPercent")
  878. }
  879. queryRAMSystemPct := fmt.Sprintf(nodeRAMSystemPctQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterLabel)
  880. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  881. return ctx.QueryAtTime(queryRAMSystemPct, end)
  882. }
  883. func (pds *PrometheusDataSource) QueryNodeRAMUserPercent(start, end time.Time) source.QueryResultsChan {
  884. // env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  885. const nodeRAMUserPctQuery = `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`
  886. cfg := pds.promConfig
  887. minsPerResolution := cfg.DataResolutionMinutes
  888. durStr := timeutil.DurationString(end.Sub(start))
  889. if durStr == "" {
  890. panic("failed to parse duration string passed to QueryNodeRAMUserPercent")
  891. }
  892. queryRAMUserPct := fmt.Sprintf(nodeRAMUserPctQuery, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, cfg.ClusterLabel)
  893. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  894. return ctx.QueryAtTime(queryRAMUserPct, end)
  895. }
  896. func (pds *PrometheusDataSource) QueryLBPricePerHr(start, end time.Time) source.QueryResultsChan {
  897. const queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
  898. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  899. cfg := pds.promConfig
  900. durStr := timeutil.DurationString(end.Sub(start))
  901. if durStr == "" {
  902. panic("failed to parse duration string passed to QueryLBPricePerHr")
  903. }
  904. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  905. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  906. return ctx.QueryAtTime(queryLBCostPerHr, end)
  907. }
  908. func (pds *PrometheusDataSource) QueryLBActiveMinutes(start, end time.Time) source.QueryResultsChan {
  909. const lbActiveMinutesQuery = `avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`
  910. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  911. cfg := pds.promConfig
  912. minsPerResolution := cfg.DataResolutionMinutes
  913. durStr := timeutil.DurationString(end.Sub(start))
  914. if durStr == "" {
  915. panic("failed to parse duration string passed to QueryLBActiveMinutes")
  916. }
  917. queryLBActiveMins := fmt.Sprintf(lbActiveMinutesQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  918. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  919. return ctx.QueryAtTime(queryLBActiveMins, end)
  920. }
  921. func (pds *PrometheusDataSource) QueryClusterManagementDuration(start, end time.Time) source.QueryResultsChan {
  922. const clusterManagementDurationQuery = `avg(kubecost_cluster_management_cost{%s}) by (%s, provisioner_name)[%s:%dm]`
  923. cfg := pds.promConfig
  924. minsPerResolution := cfg.DataResolutionMinutes
  925. durStr := timeutil.DurationString(end.Sub(start))
  926. if durStr == "" {
  927. panic("failed to parse duration string passed to QueryClusterManagementDuration")
  928. }
  929. queryClusterManagementDuration := fmt.Sprintf(clusterManagementDurationQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
  930. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  931. return ctx.QueryAtTime(queryClusterManagementDuration, end)
  932. }
  933. func (pds *PrometheusDataSource) QueryClusterManagementPricePerHr(start, end time.Time) source.QueryResultsChan {
  934. const clusterManagementCostQuery = `avg(avg_over_time(kubecost_cluster_management_cost{%s}[%s])) by (%s, provisioner_name)`
  935. // env.GetPromClusterFilter(), durationStr, env.GetPromClusterLabel()
  936. cfg := pds.promConfig
  937. durStr := timeutil.DurationString(end.Sub(start))
  938. if durStr == "" {
  939. panic("failed to parse duration string passed to QueryClusterManagementCost")
  940. }
  941. queryClusterManagementCost := fmt.Sprintf(clusterManagementCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  942. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  943. return ctx.QueryAtTime(queryClusterManagementCost, end)
  944. }
  945. func (pds *PrometheusDataSource) QueryDataCount(start, end time.Time) source.QueryResultsChan {
  946. const fmtQueryDataCount = `
  947. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]) * %d
  948. `
  949. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, minsPerResolution)
  950. cfg := pds.promConfig
  951. minsPerResolution := cfg.DataResolutionMinutes
  952. durStr := timeutil.DurationString(end.Sub(start))
  953. if durStr == "" {
  954. panic("failed to parse duration string passed to QueryDataCount")
  955. }
  956. queryDataCount := fmt.Sprintf(fmtQueryDataCount, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, minsPerResolution)
  957. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  958. return ctx.QueryAtTime(queryDataCount, end)
  959. }
  960. func (pds *PrometheusDataSource) QueryTotalGPU(start, end time.Time) source.QueryResultsChan {
  961. const fmtQueryTotalGPU = `
  962. sum(
  963. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]) * %f
  964. ) by (%s)
  965. `
  966. // env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  967. cfg := pds.promConfig
  968. minsPerResolution := cfg.DataResolutionMinutes
  969. durStr := timeutil.DurationString(end.Sub(start))
  970. if durStr == "" {
  971. panic("failed to parse duration string passed to QueryTotalGPU")
  972. }
  973. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  974. // value, converts it to a cumulative value; i.e.
  975. // [$/hr] * [min/res]*[hr/min] = [$/res]
  976. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  977. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, cfg.ClusterFilter, durStr, minsPerResolution, hourlyToCumulative, cfg.ClusterLabel)
  978. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  979. return ctx.QueryAtTime(queryTotalGPU, end)
  980. }
  981. func (pds *PrometheusDataSource) QueryTotalCPU(start, end time.Time) source.QueryResultsChan {
  982. const fmtQueryTotalCPU = `
  983. sum(
  984. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]) *
  985. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm])) by (node, %s) * %f
  986. ) by (%s)
  987. `
  988. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel()
  989. cfg := pds.promConfig
  990. minsPerResolution := cfg.DataResolutionMinutes
  991. durStr := timeutil.DurationString(end.Sub(start))
  992. if durStr == "" {
  993. panic("failed to parse duration string passed to QueryTotalCPU")
  994. }
  995. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  996. // value, converts it to a cumulative value; i.e.
  997. // [$/hr] * [min/res]*[hr/min] = [$/res]
  998. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  999. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, hourlyToCumulative, cfg.ClusterLabel)
  1000. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1001. return ctx.QueryAtTime(queryTotalCPU, end)
  1002. }
  1003. func (pds *PrometheusDataSource) QueryTotalRAM(start, end time.Time) source.QueryResultsChan {
  1004. const fmtQueryTotalRAM = `
  1005. sum(
  1006. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]) / 1024 / 1024 / 1024 *
  1007. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm])) by (node, %s) * %f
  1008. ) by (%s)
  1009. `
  1010. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, env.GetPromClusterFilter(), windowStr, minsPerResolution, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  1011. cfg := pds.promConfig
  1012. minsPerResolution := cfg.DataResolutionMinutes
  1013. durStr := timeutil.DurationString(end.Sub(start))
  1014. if durStr == "" {
  1015. panic("failed to parse duration string passed to QueryTotalRAM")
  1016. }
  1017. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  1018. // value, converts it to a cumulative value; i.e.
  1019. // [$/hr] * [min/res]*[hr/min] = [$/res]
  1020. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  1021. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, hourlyToCumulative, cfg.ClusterLabel)
  1022. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1023. return ctx.QueryAtTime(queryTotalRAM, end)
  1024. }
  1025. func (pds *PrometheusDataSource) QueryTotalStorage(start, end time.Time) source.QueryResultsChan {
  1026. const fmtQueryTotalStorage = `
  1027. sum(
  1028. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]) / 1024 / 1024 / 1024 *
  1029. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm])) by (persistentvolume, %s) * %f
  1030. ) by (%s)
  1031. `
  1032. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, env.GetPromClusterFilter(), windowStr, minsPerResolution, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  1033. cfg := pds.promConfig
  1034. minsPerResolution := cfg.DataResolutionMinutes
  1035. durStr := timeutil.DurationString(end.Sub(start))
  1036. if durStr == "" {
  1037. panic("failed to parse duration string passed to QueryTotalStorage")
  1038. }
  1039. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  1040. // value, converts it to a cumulative value; i.e.
  1041. // [$/hr] * [min/res]*[hr/min] = [$/res]
  1042. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  1043. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel, hourlyToCumulative, cfg.ClusterLabel)
  1044. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1045. return ctx.QueryAtTime(queryTotalStorage, end)
  1046. }
  1047. func (pds *PrometheusDataSource) QueryClusterCores(start, end time.Time, step time.Duration) source.QueryResultsChan {
  1048. const queryClusterCores = `sum(
  1049. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, %s) * 730 +
  1050. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, %s) * 730
  1051. ) by (%s)`
  1052. // env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1053. cfg := pds.promConfig
  1054. durStr := timeutil.DurationString(step)
  1055. if durStr == "" {
  1056. panic("failed to parse duration string passed to QueryClusterCores")
  1057. }
  1058. clusterCoresQuery := fmt.Sprintf(queryClusterCores, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  1059. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1060. return ctx.QueryRange(clusterCoresQuery, start, end, step)
  1061. }
  1062. func (pds *PrometheusDataSource) QueryClusterRAM(start, end time.Time, step time.Duration) source.QueryResultsChan {
  1063. const queryClusterRAM = `sum(
  1064. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, %s) * 730
  1065. ) by (%s)`
  1066. // env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1067. cfg := pds.promConfig
  1068. durStr := timeutil.DurationString(step)
  1069. if durStr == "" {
  1070. panic("failed to parse duration string passed to QueryClusterRAM")
  1071. }
  1072. clusterRAMQuery := fmt.Sprintf(queryClusterRAM, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel)
  1073. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1074. return ctx.QueryRange(clusterRAMQuery, start, end, step)
  1075. }
  1076. func (pds *PrometheusDataSource) QueryClusterStorage(start, end time.Time, step time.Duration) source.QueryResultsChan {
  1077. return pds.QueryClusterStorageByProvider("", start, end, step)
  1078. }
  1079. func (pds *PrometheusDataSource) QueryClusterStorageByProvider(provider string, start, end time.Time, step time.Duration) source.QueryResultsChan {
  1080. const queryStorage = `sum(
  1081. avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (persistentvolume, %s) * 730
  1082. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  1083. ) by (%s) %s`
  1084. // env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1085. var localStorageQuery string
  1086. if provider != "" {
  1087. key := strings.ToLower(provider)
  1088. if f, ok := providerStorageQueries[key]; ok {
  1089. localStorageQuery = f(pds.promConfig, start, end, true, false)
  1090. } else {
  1091. localStorageQuery = ""
  1092. }
  1093. }
  1094. if localStorageQuery != "" {
  1095. localStorageQuery = fmt.Sprintf(" + %s", localStorageQuery)
  1096. }
  1097. cfg := pds.promConfig
  1098. durStr := timeutil.DurationString(step)
  1099. if durStr == "" {
  1100. panic("failed to parse duration string passed to QueryClusterCores")
  1101. }
  1102. clusterStorageQuery := fmt.Sprintf(queryStorage, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterFilter, durStr, cfg.ClusterLabel, cfg.ClusterLabel, localStorageQuery)
  1103. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1104. return ctx.QueryRange(clusterStorageQuery, start, end, step)
  1105. }
  1106. func (pds *PrometheusDataSource) QueryClusterTotal(start, end time.Time, step time.Duration) source.QueryResultsChan {
  1107. return pds.QueryClusterTotalByProvider("", start, end, step)
  1108. }
  1109. func (pds *PrometheusDataSource) QueryClusterTotalByProvider(provider string, start, end time.Time, step time.Duration) source.QueryResultsChan {
  1110. const queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  1111. sum(
  1112. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  1113. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  1114. ) by (%s) %s`
  1115. var localStorageQuery string
  1116. if provider != "" {
  1117. key := strings.ToLower(provider)
  1118. if f, ok := providerStorageQueries[key]; ok {
  1119. localStorageQuery = f(pds.promConfig, start, end, true, false)
  1120. } else {
  1121. localStorageQuery = ""
  1122. }
  1123. }
  1124. if localStorageQuery != "" {
  1125. localStorageQuery = fmt.Sprintf(" + %s", localStorageQuery)
  1126. }
  1127. cfg := pds.promConfig
  1128. durStr := timeutil.DurationString(step)
  1129. if durStr == "" {
  1130. panic("failed to parse duration string passed to QueryClusterTotalByProvider")
  1131. }
  1132. clusterTotalQuery := fmt.Sprintf(queryTotal, cfg.ClusterFilter, cfg.ClusterLabel, cfg.ClusterFilter, cfg.ClusterLabel, cfg.ClusterFilter, cfg.ClusterLabel, cfg.ClusterLabel, localStorageQuery)
  1133. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1134. return ctx.QueryRange(clusterTotalQuery, start, end, step)
  1135. }
  1136. func (pds *PrometheusDataSource) QueryClusterNodes(start, end time.Time, step time.Duration) source.QueryResultsChan {
  1137. return pds.QueryClusterNodesByProvider("", start, end, step)
  1138. }
  1139. func (pds *PrometheusDataSource) QueryClusterNodesByProvider(provider string, start, end time.Time, step time.Duration) source.QueryResultsChan {
  1140. const queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  1141. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1142. var localStorageQuery string
  1143. if provider != "" {
  1144. key := strings.ToLower(provider)
  1145. if f, ok := providerStorageQueries[key]; ok {
  1146. localStorageQuery = f(pds.promConfig, start, end, true, false)
  1147. } else {
  1148. localStorageQuery = ""
  1149. }
  1150. }
  1151. if localStorageQuery != "" {
  1152. localStorageQuery = fmt.Sprintf(" + %s", localStorageQuery)
  1153. }
  1154. cfg := pds.promConfig
  1155. durStr := timeutil.DurationString(step)
  1156. if durStr == "" {
  1157. panic("failed to parse duration string passed to QueryClusterNodesByProvider")
  1158. }
  1159. clusterNodesCostQuery := fmt.Sprintf(queryNodes, cfg.ClusterFilter, cfg.ClusterLabel, localStorageQuery)
  1160. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  1161. return ctx.QueryRange(clusterNodesCostQuery, start, end, step)
  1162. }
  1163. // AllocationMetricQuerier
  1164. func (pds *PrometheusDataSource) QueryPods(start, end time.Time) source.QueryResultsChan {
  1165. const queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
  1166. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  1167. cfg := pds.promConfig
  1168. resolution := cfg.DataResolution
  1169. resStr := timeutil.DurationString(resolution)
  1170. durStr := timeutil.DurationString(end.Sub(start))
  1171. if durStr == "" {
  1172. panic("failed to parse duration string passed to QueryPods")
  1173. }
  1174. queryPods := fmt.Sprintf(queryFmtPods, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
  1175. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1176. return ctx.QueryAtTime(queryPods, end)
  1177. }
  1178. func (pds *PrometheusDataSource) QueryPodsUID(start, end time.Time) source.QueryResultsChan {
  1179. const queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
  1180. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  1181. cfg := pds.promConfig
  1182. resolution := cfg.DataResolution
  1183. resStr := timeutil.DurationString(resolution)
  1184. durStr := timeutil.DurationString(end.Sub(start))
  1185. if durStr == "" {
  1186. panic("failed to parse duration string passed to QueryPodsUID")
  1187. }
  1188. queryPodsUID := fmt.Sprintf(queryFmtPodsUID, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
  1189. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1190. return ctx.QueryAtTime(queryPodsUID, end)
  1191. }
  1192. func (pds *PrometheusDataSource) QueryRAMBytesAllocated(start, end time.Time) source.QueryResultsChan {
  1193. const queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  1194. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1195. cfg := pds.promConfig
  1196. durStr := timeutil.DurationString(end.Sub(start))
  1197. if durStr == "" {
  1198. panic("failed to parse duration string passed to QueryRAMBytesAllocated")
  1199. }
  1200. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1201. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1202. return ctx.QueryAtTime(queryRAMBytesAllocated, end)
  1203. }
  1204. func (pds *PrometheusDataSource) QueryRAMRequests(start, end time.Time) source.QueryResultsChan {
  1205. const queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  1206. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1207. cfg := pds.promConfig
  1208. durStr := timeutil.DurationString(end.Sub(start))
  1209. if durStr == "" {
  1210. panic("failed to parse duration string passed to QueryRAMRequests")
  1211. }
  1212. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1213. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1214. return ctx.QueryAtTime(queryRAMRequests, end)
  1215. }
  1216. func (pds *PrometheusDataSource) QueryRAMUsageAvg(start, end time.Time) source.QueryResultsChan {
  1217. const queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  1218. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1219. cfg := pds.promConfig
  1220. durStr := timeutil.DurationString(end.Sub(start))
  1221. if durStr == "" {
  1222. panic("failed to parse duration string passed to QueryRAMUsageAvg")
  1223. }
  1224. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1225. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1226. return ctx.QueryAtTime(queryRAMUsageAvg, end)
  1227. }
  1228. func (pds *PrometheusDataSource) QueryRAMUsageMax(start, end time.Time) source.QueryResultsChan {
  1229. const queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  1230. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1231. cfg := pds.promConfig
  1232. durStr := timeutil.DurationString(end.Sub(start))
  1233. if durStr == "" {
  1234. panic("failed to parse duration string passed to QueryRAMUsageMax")
  1235. }
  1236. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1237. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1238. return ctx.QueryAtTime(queryRAMUsageMax, end)
  1239. }
  1240. func (pds *PrometheusDataSource) QueryCPUCoresAllocated(start, end time.Time) source.QueryResultsChan {
  1241. const queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  1242. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1243. cfg := pds.promConfig
  1244. durStr := timeutil.DurationString(end.Sub(start))
  1245. if durStr == "" {
  1246. panic("failed to parse duration string passed to QueryCPUCoresAllocated")
  1247. }
  1248. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1249. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1250. return ctx.QueryAtTime(queryCPUCoresAllocated, end)
  1251. }
  1252. func (pds *PrometheusDataSource) QueryCPURequests(start, end time.Time) source.QueryResultsChan {
  1253. const queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  1254. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1255. cfg := pds.promConfig
  1256. durStr := timeutil.DurationString(end.Sub(start))
  1257. if durStr == "" {
  1258. panic("failed to parse duration string passed to QueryCPURequests")
  1259. }
  1260. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1261. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1262. return ctx.QueryAtTime(queryCPURequests, end)
  1263. }
  1264. func (pds *PrometheusDataSource) QueryCPUUsageAvg(start, end time.Time) source.QueryResultsChan {
  1265. const queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  1266. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1267. cfg := pds.promConfig
  1268. durStr := timeutil.DurationString(end.Sub(start))
  1269. if durStr == "" {
  1270. panic("failed to parse duration string passed to QueryCPUUsageAvg")
  1271. }
  1272. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1273. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1274. return ctx.QueryAtTime(queryCPUUsageAvg, end)
  1275. }
  1276. func (pds *PrometheusDataSource) QueryCPUUsageMax(start, end time.Time) source.QueryResultsChan {
  1277. // Because we use container_cpu_usage_seconds_total to calculate CPU usage
  1278. // at any given "instant" of time, we need to use an irate or rate. To then
  1279. // calculate a max (or any aggregation) we have to perform an aggregation
  1280. // query on top of an instant-by-instant maximum. Prometheus supports this
  1281. // type of query with a "subquery" [1], however it is reportedly expensive
  1282. // to make such a query. By default, Kubecost's Prometheus config includes
  1283. // a recording rule that keeps track of the instant-by-instant irate for CPU
  1284. // usage. The metric in this query is created by that recording rule.
  1285. //
  1286. // [1] https://prometheus.io/blog/2019/01/28/subquery-support/
  1287. //
  1288. // If changing the name of the recording rule, make sure to update the
  1289. // corresponding diagnostic query to avoid confusion.
  1290. const queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  1291. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1292. // This is the subquery equivalent of the above recording rule query. It is
  1293. // more expensive, but does not require the recording rule. It should be
  1294. // used as a fallback query if the recording rule data does not exist.
  1295. //
  1296. // The parameter after the colon [:<thisone>] in the subquery affects the
  1297. // resolution of the subquery.
  1298. // The parameter after the metric ...{}[<thisone>] should be set to 2x
  1299. // the resolution, to make sure the irate always has two points to query
  1300. // in case the Prom scrape duration has been reduced to be equal to the
  1301. // ETL resolution.
  1302. const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, instance, %s)`
  1303. // env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel()
  1304. cfg := pds.promConfig
  1305. durStr := timeutil.DurationString(end.Sub(start))
  1306. if durStr == "" {
  1307. panic("failed to parse duration string passed to QueryCPUUsageMax")
  1308. }
  1309. queryCPUUsageMaxRecordingRule := fmt.Sprintf(queryFmtCPUUsageMaxRecordingRule, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1310. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1311. resCPUUsageMaxRR := ctx.QueryAtTime(queryCPUUsageMaxRecordingRule, end)
  1312. resCPUUsageMax, _ := resCPUUsageMaxRR.Await()
  1313. if len(resCPUUsageMax) > 0 {
  1314. return wrapResults(queryCPUUsageMaxRecordingRule, resCPUUsageMax)
  1315. }
  1316. resolution := cfg.DataResolution
  1317. resStr := timeutil.DurationString(resolution)
  1318. doubleResStr := timeutil.DurationString(2 * resolution)
  1319. queryCPUUsageMaxSubquery := fmt.Sprintf(queryFmtCPUUsageMaxSubquery, cfg.ClusterFilter, doubleResStr, durStr, resStr, cfg.ClusterLabel)
  1320. return ctx.QueryAtTime(queryCPUUsageMaxSubquery, end)
  1321. }
  1322. func (pds *PrometheusDataSource) QueryGPUsRequested(start, end time.Time) source.QueryResultsChan {
  1323. const queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  1324. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1325. cfg := pds.promConfig
  1326. durStr := timeutil.DurationString(end.Sub(start))
  1327. if durStr == "" {
  1328. panic("failed to parse duration string passed to QueryGPUsRequested")
  1329. }
  1330. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1331. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1332. return ctx.QueryAtTime(queryGPUsRequested, end)
  1333. }
  1334. func (pds *PrometheusDataSource) QueryGPUsUsageAvg(start, end time.Time) source.QueryResultsChan {
  1335. const queryFmtGPUsUsageAvg = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
  1336. // durStr, env.GetPromClusterLabel()
  1337. cfg := pds.promConfig
  1338. durStr := timeutil.DurationString(end.Sub(start))
  1339. if durStr == "" {
  1340. panic("failed to parse duration string passed to QueryGPUsUsageAvg")
  1341. }
  1342. queryGPUsUsageAvg := fmt.Sprintf(queryFmtGPUsUsageAvg, durStr, cfg.ClusterLabel)
  1343. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1344. return ctx.QueryAtTime(queryGPUsUsageAvg, end)
  1345. }
  1346. func (pds *PrometheusDataSource) QueryGPUsUsageMax(start, end time.Time) source.QueryResultsChan {
  1347. const queryFmtGPUsUsageMax = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
  1348. // durStr, env.GetPromClusterLabel()
  1349. cfg := pds.promConfig
  1350. durStr := timeutil.DurationString(end.Sub(start))
  1351. if durStr == "" {
  1352. panic("failed to parse duration string passed to QueryGPUsUsageMax")
  1353. }
  1354. queryGPUsUsageMax := fmt.Sprintf(queryFmtGPUsUsageMax, durStr, cfg.ClusterLabel)
  1355. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1356. return ctx.QueryAtTime(queryGPUsUsageMax, end)
  1357. }
  1358. func (pds *PrometheusDataSource) QueryGPUsAllocated(start, end time.Time) source.QueryResultsChan {
  1359. const queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  1360. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1361. cfg := pds.promConfig
  1362. durStr := timeutil.DurationString(end.Sub(start))
  1363. if durStr == "" {
  1364. panic("failed to parse duration string passed to QueryGPUsAllocated")
  1365. }
  1366. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1367. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1368. return ctx.QueryAtTime(queryGPUsAllocated, end)
  1369. }
  1370. func (pds *PrometheusDataSource) QueryNodeCostPerCPUHr(start, end time.Time) source.QueryResultsChan {
  1371. const queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  1372. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1373. cfg := pds.promConfig
  1374. durStr := timeutil.DurationString(end.Sub(start))
  1375. if durStr == "" {
  1376. panic("failed to parse duration string passed to QueryNodeCostPerCPUHr")
  1377. }
  1378. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1379. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1380. return ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  1381. }
  1382. func (pds *PrometheusDataSource) QueryNodeCostPerRAMGiBHr(start, end time.Time) source.QueryResultsChan {
  1383. const queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  1384. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1385. cfg := pds.promConfig
  1386. durStr := timeutil.DurationString(end.Sub(start))
  1387. if durStr == "" {
  1388. panic("failed to parse duration string passed to QueryNodeCostPerRAMGiBHr")
  1389. }
  1390. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1391. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1392. return ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  1393. }
  1394. func (pds *PrometheusDataSource) QueryNodeCostPerGPUHr(start, end time.Time) source.QueryResultsChan {
  1395. const queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  1396. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1397. cfg := pds.promConfig
  1398. durStr := timeutil.DurationString(end.Sub(start))
  1399. if durStr == "" {
  1400. panic("failed to parse duration string passed to QueryNodeCostPerGPUHr")
  1401. }
  1402. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1403. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1404. return ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  1405. }
  1406. func (pds *PrometheusDataSource) QueryNodeIsSpot2(start, end time.Time) source.QueryResultsChan {
  1407. const queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot{%s}[%s])`
  1408. // env.GetPromClusterFilter(), durStr)
  1409. cfg := pds.promConfig
  1410. durStr := timeutil.DurationString(end.Sub(start))
  1411. if durStr == "" {
  1412. panic("failed to parse duration string passed to QueryNodeIsSpot2")
  1413. }
  1414. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, cfg.ClusterFilter, durStr)
  1415. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1416. return ctx.QueryAtTime(queryNodeIsSpot, end)
  1417. }
  1418. func (pds *PrometheusDataSource) QueryPVCInfo2(start, end time.Time) source.QueryResultsChan {
  1419. const queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  1420. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  1421. cfg := pds.promConfig
  1422. resolution := cfg.DataResolution
  1423. resStr := timeutil.DurationString(resolution)
  1424. durStr := timeutil.DurationString(end.Sub(start))
  1425. if durStr == "" {
  1426. panic("failed to parse duration string passed to QueryPVCInfo2")
  1427. }
  1428. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
  1429. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1430. return ctx.QueryAtTime(queryPVCInfo, end)
  1431. }
  1432. func (pds *PrometheusDataSource) QueryPodPVCAllocation(start, end time.Time) source.QueryResultsChan {
  1433. const queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation{%s}[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  1434. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1435. cfg := pds.promConfig
  1436. durStr := timeutil.DurationString(end.Sub(start))
  1437. if durStr == "" {
  1438. panic("failed to parse duration string passed to QueryPodPVCAllocation")
  1439. }
  1440. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1441. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1442. return ctx.QueryAtTime(queryPodPVCAllocation, end)
  1443. }
  1444. func (pds *PrometheusDataSource) QueryPVCBytesRequested(start, end time.Time) source.QueryResultsChan {
  1445. const queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}[%s])) by (persistentvolumeclaim, namespace, %s)`
  1446. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1447. cfg := pds.promConfig
  1448. durStr := timeutil.DurationString(end.Sub(start))
  1449. if durStr == "" {
  1450. panic("failed to parse duration string passed to QueryPVCBytesRequested")
  1451. }
  1452. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1453. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1454. return ctx.QueryAtTime(queryPVCBytesRequested, end)
  1455. }
  1456. func (pds *PrometheusDataSource) QueryPVActiveMins(start, end time.Time) source.QueryResultsChan {
  1457. const queryFmtPVActiveMins = `count(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%s]`
  1458. // env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  1459. cfg := pds.promConfig
  1460. resolution := cfg.DataResolution
  1461. resStr := timeutil.DurationString(resolution)
  1462. durStr := timeutil.DurationString(end.Sub(start))
  1463. if durStr == "" {
  1464. panic("failed to parse duration string passed to QueryPVActiveMins")
  1465. }
  1466. queryPVActiveMins := fmt.Sprintf(queryFmtPVActiveMins, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
  1467. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1468. return ctx.QueryAtTime(queryPVActiveMins, end)
  1469. }
  1470. func (pds *PrometheusDataSource) QueryPVBytes(start, end time.Time) source.QueryResultsChan {
  1471. const queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s)`
  1472. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1473. cfg := pds.promConfig
  1474. durStr := timeutil.DurationString(end.Sub(start))
  1475. if durStr == "" {
  1476. panic("failed to parse duration string passed to QueryPVBytes")
  1477. }
  1478. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1479. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1480. return ctx.QueryAtTime(queryPVBytes, end)
  1481. }
  1482. func (pds *PrometheusDataSource) QueryPVCostPerGiBHour(start, end time.Time) source.QueryResultsChan {
  1483. const queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (volumename, %s)`
  1484. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1485. cfg := pds.promConfig
  1486. durStr := timeutil.DurationString(end.Sub(start))
  1487. if durStr == "" {
  1488. panic("failed to parse duration string passed to QueryPVCostPerGiBHour")
  1489. }
  1490. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1491. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1492. return ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  1493. }
  1494. func (pds *PrometheusDataSource) QueryPVMeta(start, end time.Time) source.QueryResultsChan {
  1495. const queryFmtPVMeta = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, provider_id)`
  1496. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1497. cfg := pds.promConfig
  1498. durStr := timeutil.DurationString(end.Sub(start))
  1499. if durStr == "" {
  1500. panic("failed to parse duration string passed to QueryPVMeta")
  1501. }
  1502. queryPVMeta := fmt.Sprintf(queryFmtPVMeta, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1503. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1504. return ctx.QueryAtTime(queryPVMeta, end)
  1505. }
  1506. func (pds *PrometheusDataSource) QueryNetZoneGiB(start, end time.Time) source.QueryResultsChan {
  1507. const queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  1508. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1509. cfg := pds.promConfig
  1510. durStr := timeutil.DurationString(end.Sub(start))
  1511. if durStr == "" {
  1512. panic("failed to parse duration string passed to QueryNetZoneGiB")
  1513. }
  1514. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1515. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1516. return ctx.QueryAtTime(queryNetZoneGiB, end)
  1517. }
  1518. func (pds *PrometheusDataSource) QueryNetZoneCostPerGiB(start, end time.Time) source.QueryResultsChan {
  1519. const queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
  1520. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1521. cfg := pds.promConfig
  1522. durStr := timeutil.DurationString(end.Sub(start))
  1523. if durStr == "" {
  1524. panic("failed to parse duration string passed to QueryNetZoneCostPerGiB")
  1525. }
  1526. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1527. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1528. return ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  1529. }
  1530. func (pds *PrometheusDataSource) QueryNetRegionGiB(start, end time.Time) source.QueryResultsChan {
  1531. const queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  1532. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1533. cfg := pds.promConfig
  1534. durStr := timeutil.DurationString(end.Sub(start))
  1535. if durStr == "" {
  1536. panic("failed to parse duration string passed to QueryNetRegionGiB")
  1537. }
  1538. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1539. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1540. return ctx.QueryAtTime(queryNetRegionGiB, end)
  1541. }
  1542. func (pds *PrometheusDataSource) QueryNetRegionCostPerGiB(start, end time.Time) source.QueryResultsChan {
  1543. const queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{%s}[%s])) by (%s)`
  1544. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1545. cfg := pds.promConfig
  1546. durStr := timeutil.DurationString(end.Sub(start))
  1547. if durStr == "" {
  1548. panic("failed to parse duration string passed to QueryNetRegionCostPerGiB")
  1549. }
  1550. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1551. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1552. return ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  1553. }
  1554. func (pds *PrometheusDataSource) QueryNetInternetGiB(start, end time.Time) source.QueryResultsChan {
  1555. const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  1556. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1557. cfg := pds.promConfig
  1558. durStr := timeutil.DurationString(end.Sub(start))
  1559. if durStr == "" {
  1560. panic("failed to parse duration string passed to QueryNetInternetGiB")
  1561. }
  1562. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1563. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1564. return ctx.QueryAtTime(queryNetInternetGiB, end)
  1565. }
  1566. func (pds *PrometheusDataSource) QueryNetInternetCostPerGiB(start, end time.Time) source.QueryResultsChan {
  1567. const queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{%s}[%s])) by (%s)`
  1568. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
  1569. cfg := pds.promConfig
  1570. durStr := timeutil.DurationString(end.Sub(start))
  1571. if durStr == "" {
  1572. panic("failed to parse duration string passed to QueryNetInternetCostPerGiB")
  1573. }
  1574. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1575. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1576. return ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  1577. }
  1578. func (pds *PrometheusDataSource) QueryNetReceiveBytes(start, end time.Time) source.QueryResultsChan {
  1579. const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
  1580. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1581. cfg := pds.promConfig
  1582. durStr := timeutil.DurationString(end.Sub(start))
  1583. if durStr == "" {
  1584. panic("failed to parse duration string passed to QueryNetReceiveBytes")
  1585. }
  1586. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1587. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1588. return ctx.QueryAtTime(queryNetReceiveBytes, end)
  1589. }
  1590. func (pds *PrometheusDataSource) QueryNetTransferBytes(start, end time.Time) source.QueryResultsChan {
  1591. const queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
  1592. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1593. cfg := pds.promConfig
  1594. durStr := timeutil.DurationString(end.Sub(start))
  1595. if durStr == "" {
  1596. panic("failed to parse duration string passed to QueryNetTransferBytes")
  1597. }
  1598. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1599. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1600. return ctx.QueryAtTime(queryNetTransferBytes, end)
  1601. }
  1602. func (pds *PrometheusDataSource) QueryNodeLabels2(start, end time.Time) source.QueryResultsChan {
  1603. const queryFmtNodeLabels = `avg_over_time(kube_node_labels{%s}[%s])`
  1604. // env.GetPromClusterFilter(), durStr
  1605. cfg := pds.promConfig
  1606. durStr := timeutil.DurationString(end.Sub(start))
  1607. if durStr == "" {
  1608. panic("failed to parse duration string passed to QueryNodeLabels2")
  1609. }
  1610. queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, cfg.ClusterFilter, durStr)
  1611. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1612. return ctx.QueryAtTime(queryNodeLabels, end)
  1613. }
  1614. func (pds *PrometheusDataSource) QueryNamespaceLabels(start, end time.Time) source.QueryResultsChan {
  1615. const queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels{%s}[%s])`
  1616. // env.GetPromClusterFilter(), durStr
  1617. cfg := pds.promConfig
  1618. durStr := timeutil.DurationString(end.Sub(start))
  1619. if durStr == "" {
  1620. panic("failed to parse duration string passed to QueryNamespaceLabels")
  1621. }
  1622. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, cfg.ClusterFilter, durStr)
  1623. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1624. return ctx.QueryAtTime(queryNamespaceLabels, end)
  1625. }
  1626. func (pds *PrometheusDataSource) QueryNamespaceAnnotations(start, end time.Time) source.QueryResultsChan {
  1627. const queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations{%s}[%s])`
  1628. // env.GetPromClusterFilter(), durStr
  1629. cfg := pds.promConfig
  1630. durStr := timeutil.DurationString(end.Sub(start))
  1631. if durStr == "" {
  1632. panic("failed to parse duration string passed to QueryNamespaceAnnotations")
  1633. }
  1634. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, cfg.ClusterFilter, durStr)
  1635. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1636. return ctx.QueryAtTime(queryNamespaceAnnotations, end)
  1637. }
  1638. func (pds *PrometheusDataSource) QueryPodLabels(start, end time.Time) source.QueryResultsChan {
  1639. const queryFmtPodLabels = `avg_over_time(kube_pod_labels{%s}[%s])`
  1640. // env.GetPromClusterFilter(), durStr
  1641. cfg := pds.promConfig
  1642. durStr := timeutil.DurationString(end.Sub(start))
  1643. if durStr == "" {
  1644. panic("failed to parse duration string passed to QueryPodLabels")
  1645. }
  1646. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, cfg.ClusterFilter, durStr)
  1647. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1648. return ctx.QueryAtTime(queryPodLabels, end)
  1649. }
  1650. func (pds *PrometheusDataSource) QueryPodAnnotations(start, end time.Time) source.QueryResultsChan {
  1651. const queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations{%s}[%s])`
  1652. // env.GetPromClusterFilter(), durStr
  1653. cfg := pds.promConfig
  1654. durStr := timeutil.DurationString(end.Sub(start))
  1655. if durStr == "" {
  1656. panic("failed to parse duration string passed to QueryPodAnnotations")
  1657. }
  1658. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, cfg.ClusterFilter, durStr)
  1659. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1660. return ctx.QueryAtTime(queryPodAnnotations, end)
  1661. }
  1662. func (pds *PrometheusDataSource) QueryServiceLabels(start, end time.Time) source.QueryResultsChan {
  1663. const queryFmtServiceLabels = `avg_over_time(service_selector_labels{%s}[%s])`
  1664. // env.GetPromClusterFilter(), durStr
  1665. cfg := pds.promConfig
  1666. durStr := timeutil.DurationString(end.Sub(start))
  1667. if durStr == "" {
  1668. panic("failed to parse duration string passed to QueryServiceLabels")
  1669. }
  1670. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, cfg.ClusterFilter, durStr)
  1671. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1672. return ctx.QueryAtTime(queryServiceLabels, end)
  1673. }
  1674. func (pds *PrometheusDataSource) QueryDeploymentLabels(start, end time.Time) source.QueryResultsChan {
  1675. const queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels{%s}[%s])`
  1676. // env.GetPromClusterFilter(), durStr
  1677. cfg := pds.promConfig
  1678. durStr := timeutil.DurationString(end.Sub(start))
  1679. if durStr == "" {
  1680. panic("failed to parse duration string passed to QueryNamespaceAnnotations")
  1681. }
  1682. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, cfg.ClusterFilter, durStr)
  1683. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1684. return ctx.QueryAtTime(queryDeploymentLabels, end)
  1685. }
  1686. func (pds *PrometheusDataSource) QueryStatefulSetLabels(start, end time.Time) source.QueryResultsChan {
  1687. const queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels{%s}[%s])`
  1688. // env.GetPromClusterFilter(), durStr
  1689. cfg := pds.promConfig
  1690. durStr := timeutil.DurationString(end.Sub(start))
  1691. if durStr == "" {
  1692. panic("failed to parse duration string passed to QueryStatefulSetLabels")
  1693. }
  1694. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, cfg.ClusterFilter, durStr)
  1695. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1696. return ctx.QueryAtTime(queryStatefulSetLabels, end)
  1697. }
  1698. func (pds *PrometheusDataSource) QueryDaemonSetLabels(start, end time.Time) source.QueryResultsChan {
  1699. const queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet", %s}[%s])) by (pod, owner_name, namespace, %s)`
  1700. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1701. cfg := pds.promConfig
  1702. durStr := timeutil.DurationString(end.Sub(start))
  1703. if durStr == "" {
  1704. panic("failed to parse duration string passed to QueryDaemonSetLabels")
  1705. }
  1706. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1707. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1708. return ctx.QueryAtTime(queryDaemonSetLabels, end)
  1709. }
  1710. func (pds *PrometheusDataSource) QueryJobLabels(start, end time.Time) source.QueryResultsChan {
  1711. const queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job", %s}[%s])) by (pod, owner_name, namespace ,%s)`
  1712. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1713. cfg := pds.promConfig
  1714. durStr := timeutil.DurationString(end.Sub(start))
  1715. if durStr == "" {
  1716. panic("failed to parse duration string passed to QueryJobLabels")
  1717. }
  1718. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1719. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1720. return ctx.QueryAtTime(queryJobLabels, end)
  1721. }
  1722. func (pds *PrometheusDataSource) QueryPodsWithReplicaSetOwner(start, end time.Time) source.QueryResultsChan {
  1723. const queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet", %s}[%s])) by (pod, owner_name, namespace ,%s)`
  1724. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1725. cfg := pds.promConfig
  1726. durStr := timeutil.DurationString(end.Sub(start))
  1727. if durStr == "" {
  1728. panic("failed to parse duration string passed to QueryPodsWithReplicaSetOwner")
  1729. }
  1730. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1731. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1732. return ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  1733. }
  1734. func (pds *PrometheusDataSource) QueryReplicaSetsWithoutOwners(start, end time.Time) source.QueryResultsChan {
  1735. const queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>", %s}[%s])) by (replicaset, namespace, %s)`
  1736. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1737. cfg := pds.promConfig
  1738. durStr := timeutil.DurationString(end.Sub(start))
  1739. if durStr == "" {
  1740. panic("failed to parse duration string passed to QueryReplicaSetsWithoutOwners")
  1741. }
  1742. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1743. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1744. return ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  1745. }
  1746. func (pds *PrometheusDataSource) QueryReplicaSetsWithRollout(start, end time.Time) source.QueryResultsChan {
  1747. const queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout", %s}[%s])) by (replicaset, namespace, owner_kind, owner_name, %s)`
  1748. // env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  1749. cfg := pds.promConfig
  1750. durStr := timeutil.DurationString(end.Sub(start))
  1751. if durStr == "" {
  1752. panic("failed to parse duration string passed to QueryReplicaSetsWithRollout")
  1753. }
  1754. queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
  1755. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1756. return ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
  1757. }
  1758. func (pds *PrometheusDataSource) QueryDataCoverage(limitDays int) (time.Time, time.Time, error) {
  1759. const (
  1760. queryFmtOldestSample = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  1761. queryFmtNewestSample = `max_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  1762. )
  1763. cfg := pds.promConfig
  1764. now := time.Now()
  1765. durStr := fmt.Sprintf("%dd", limitDays)
  1766. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1767. queryOldest := fmt.Sprintf(queryFmtOldestSample, cfg.ClusterFilter, durStr, "1h")
  1768. resOldestFut := ctx.QueryAtTime(queryOldest, now)
  1769. resOldest, err := resOldestFut.Await()
  1770. if err != nil {
  1771. return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
  1772. }
  1773. if len(resOldest) == 0 || len(resOldest[0].Values) == 0 {
  1774. return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
  1775. }
  1776. oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
  1777. queryNewest := fmt.Sprintf(queryFmtNewestSample, cfg.ClusterFilter, durStr, "1h")
  1778. resNewestFut := ctx.QueryAtTime(queryNewest, now)
  1779. resNewest, err := resNewestFut.Await()
  1780. if err != nil {
  1781. return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
  1782. }
  1783. if len(resNewest) == 0 || len(resNewest[0].Values) == 0 {
  1784. return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
  1785. }
  1786. newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
  1787. return oldest, newest, nil
  1788. }
  1789. func (pds *PrometheusDataSource) QueryIsGPUShared(start, end time.Time) source.QueryResultsChan {
  1790. const queryFmtIsGPUShared = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", node != "", pod != "", container!= "", unit = "integer", %s}[%s])) by (container, pod, namespace, node, resource)`
  1791. // env.GetPromClusterFilter(), durStr
  1792. cfg := pds.promConfig
  1793. durStr := timeutil.DurationString(end.Sub(start))
  1794. if durStr == "" {
  1795. panic("failed to parse duration string passed to QueryIsGPUShared")
  1796. }
  1797. queryIsGPUShared := fmt.Sprintf(queryFmtIsGPUShared, cfg.ClusterFilter, durStr)
  1798. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1799. return ctx.QueryAtTime(queryIsGPUShared, end)
  1800. }
  1801. func (pds *PrometheusDataSource) QueryGetGPUInfo(start, end time.Time) source.QueryResultsChan {
  1802. const queryFmtGetGPUInfo = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{container!="",%s}[%s])) by (container, pod, namespace, device, modelName, UUID)`
  1803. // env.GetPromClusterFilter(), durStr
  1804. cfg := pds.promConfig
  1805. durStr := timeutil.DurationString(end.Sub(start))
  1806. if durStr == "" {
  1807. panic("failed to parse duration string passed to QueryGetGPUInfo")
  1808. }
  1809. queryGetGPUInfo := fmt.Sprintf(queryFmtGetGPUInfo, cfg.ClusterFilter, durStr)
  1810. ctx := pds.promContexts.NewNamedContext(AllocationContextName)
  1811. return ctx.QueryAtTime(queryGetGPUInfo, end)
  1812. }
  1813. func newEmptyResult() source.QueryResultsChan {
  1814. ch := make(source.QueryResultsChan)
  1815. go func() {
  1816. results := source.NewQueryResults("")
  1817. ch <- results
  1818. }()
  1819. return ch
  1820. }
  1821. func wrapResults(query string, results []*source.QueryResult) source.QueryResultsChan {
  1822. ch := make(source.QueryResultsChan)
  1823. go func() {
  1824. r := source.NewQueryResults(query)
  1825. r.Results = results
  1826. ch <- r
  1827. }()
  1828. return ch
  1829. }
  1830. func snapResolutionMinute(res time.Duration) time.Duration {
  1831. resMins := int64(math.Trunc(res.Minutes()))
  1832. if resMins <= 0 {
  1833. resMins = 1
  1834. }
  1835. return time.Duration(resMins) * time.Minute
  1836. }
  1837. func formatResolutionMinutes(resMins int64) string {
  1838. if resMins%60 == 0 {
  1839. return fmt.Sprintf("%dh", resMins/60)
  1840. }
  1841. return fmt.Sprintf("%dm", resMins)
  1842. }