datasource.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/Masterminds/semver/v3"
  9. "github.com/julienschmidt/httprouter"
  10. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  11. "github.com/opencost/opencost/core/pkg/clusters"
  12. "github.com/opencost/opencost/core/pkg/diagnostics"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/protocol"
  15. "github.com/opencost/opencost/core/pkg/source"
  16. "github.com/opencost/opencost/core/pkg/util/httputil"
  17. "github.com/opencost/opencost/core/pkg/util/json"
  18. prometheus "github.com/prometheus/client_golang/api"
  19. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  20. )
  21. const (
  22. apiPrefix = "/api/v1"
  23. epAlertManagers = apiPrefix + "/alertmanagers"
  24. epLabelValues = apiPrefix + "/label/:name/values"
  25. epSeries = apiPrefix + "/series"
  26. epTargets = apiPrefix + "/targets"
  27. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  28. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  29. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  30. epConfig = apiPrefix + "/status/config"
  31. epFlags = apiPrefix + "/status/flags"
  32. epRules = apiPrefix + "/rules"
  33. )
  34. // helper for query range proxy requests
  35. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  36. var e error
  37. ss := qp.Get("start", "")
  38. es := qp.Get("end", "")
  39. ds := qp.Get("duration", "")
  40. layout := "2006-01-02T15:04:05.000Z"
  41. start, e = time.Parse(layout, ss)
  42. if e != nil {
  43. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  44. return
  45. }
  46. end, e = time.Parse(layout, es)
  47. if e != nil {
  48. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  49. return
  50. }
  51. step, e = time.ParseDuration(ds)
  52. if e != nil {
  53. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  54. return
  55. }
  56. err = nil
  57. return
  58. }
  59. // creates a new help error which indicates the caller can retry and is non-fatal.
  60. func newHelpRetryError(format string, args ...any) error {
  61. formatWithHelp := format + "\nTroubleshooting help available at: %s"
  62. args = append(args, PrometheusTroubleshootingURL)
  63. cause := fmt.Errorf(formatWithHelp, args...)
  64. return source.NewHelpRetryError(cause)
  65. }
  66. // PrometheusDataSource is the OpenCost data source implementation leveraging Prometheus. Prometheus provides longer retention periods and
  67. // more detailed metrics than the OpenCost Collector, which is useful for historical analysis and cost forecasting.
  68. type PrometheusDataSource struct {
  69. promConfig *OpenCostPrometheusConfig
  70. promClient prometheus.Client
  71. promContexts *ContextFactory
  72. thanosConfig *OpenCostThanosConfig
  73. thanosClient prometheus.Client
  74. thanosContexts *ContextFactory
  75. metricsQuerier *PrometheusMetricsQuerier
  76. clusterMap clusters.ClusterMap
  77. clusterInfo clusters.ClusterInfoProvider
  78. }
  79. // NewDefaultPrometheusDataSource creates and initializes a new `PrometheusDataSource` with configuration
  80. // parsed from environment variables. This function will block until a connection to prometheus is established,
  81. // or fails. It is recommended to run this function in a goroutine on a retry cycle.
  82. func NewDefaultPrometheusDataSource(clusterInfoProvider clusters.ClusterInfoProvider) (*PrometheusDataSource, error) {
  83. config, err := NewOpenCostPrometheusConfigFromEnv()
  84. if err != nil {
  85. return nil, fmt.Errorf("failed to create prometheus config from env: %w", err)
  86. }
  87. var thanosConfig *OpenCostThanosConfig
  88. if env.IsThanosEnabled() {
  89. // thanos initialization is not fatal, so we log the error and continue
  90. thanosConfig, err = NewOpenCostThanosConfigFromEnv()
  91. if err != nil {
  92. log.Warnf("Thanos was enabled, but failed to create thanos config from env: %s. Continuing...", err.Error())
  93. }
  94. }
  95. return NewPrometheusDataSource(clusterInfoProvider, config, thanosConfig)
  96. }
  97. // NewPrometheusDataSource initializes clients for Prometheus and Thanos, and returns a new PrometheusDataSource.
  98. func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConfig *OpenCostPrometheusConfig, thanosConfig *OpenCostThanosConfig) (*PrometheusDataSource, error) {
  99. promClient, err := NewPrometheusClient(promConfig.ServerEndpoint, promConfig.ClientConfig)
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to build prometheus client: %w", err)
  102. }
  103. // validation of the prometheus client
  104. m, err := Validate(promClient, promConfig)
  105. if err != nil || !m.Running {
  106. if err != nil {
  107. return nil, newHelpRetryError("failed to query prometheus at %s: %w", promConfig.ServerEndpoint, err)
  108. } else if !m.Running {
  109. return nil, newHelpRetryError("prometheus at %s is not running", promConfig.ServerEndpoint)
  110. }
  111. } else {
  112. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", promConfig.ServerEndpoint)
  113. }
  114. // we don't consider this a fatal error, but we log for visibility
  115. api := prometheusAPI.NewAPI(promClient)
  116. bi, err := api.Buildinfo(context.Background())
  117. if err != nil {
  118. log.Infof("No valid prometheus config file at %s. Error: %s.\nTroubleshooting help available at: %s.\n**Ignore if using cortex/mimir/thanos here**", promConfig.ServerEndpoint, err.Error(), PrometheusTroubleshootingURL)
  119. } else {
  120. log.Infof("Retrieved a prometheus config file from: %s", promConfig.ServerEndpoint)
  121. promConfig.Version = bi.Version
  122. // for versions of prometheus >= 3.0.0, we need to offset the resolution for range queries
  123. // due to a breaking change in prometheus lookback and range query alignment
  124. v, err := semver.NewVersion(promConfig.Version)
  125. if err != nil {
  126. log.Warnf("Failed to parse prometheus version %s. Error: %s", promConfig.Version, err.Error())
  127. } else {
  128. promConfig.IsOffsetResolution = v.Major() >= 3
  129. }
  130. }
  131. // Fix scrape interval if zero by attempting to lookup the interval for the configured job
  132. if promConfig.ScrapeInterval == 0 {
  133. promConfig.ScrapeInterval = time.Minute
  134. // Lookup scrape interval for kubecost job, update if found
  135. si, err := ScrapeIntervalFor(promClient, promConfig.JobName)
  136. if err == nil {
  137. promConfig.ScrapeInterval = si
  138. }
  139. }
  140. log.Infof("Using scrape interval of %f", promConfig.ScrapeInterval.Seconds())
  141. promContexts := NewContextFactory(promClient, promConfig)
  142. var thanosClient prometheus.Client
  143. var thanosContexts *ContextFactory
  144. // if the thanos configuration is non-nil, we assume intent to use thanos. However, failure to
  145. // initialize the thanos client is not fatal, and we will log the error and continue.
  146. if thanosConfig != nil {
  147. thanosHost := thanosConfig.ServerEndpoint
  148. if thanosHost != "" {
  149. thanosCli, _ := NewThanosClient(thanosHost, thanosConfig)
  150. _, err = Validate(thanosCli, thanosConfig.OpenCostPrometheusConfig)
  151. if err != nil {
  152. log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosHost, err.Error())
  153. thanosClient = thanosCli
  154. } else {
  155. log.Infof("Success: retrieved the 'up' query against Thanos at: %s", thanosHost)
  156. thanosClient = thanosCli
  157. }
  158. thanosContexts = NewContextFactory(thanosClient, thanosConfig.OpenCostPrometheusConfig)
  159. } else {
  160. log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  161. }
  162. }
  163. // metadata creation for cluster info
  164. thanosEnabled := thanosClient != nil
  165. metadata := map[string]string{
  166. clusters.ClusterInfoThanosEnabledKey: fmt.Sprintf("%t", thanosEnabled),
  167. }
  168. if thanosEnabled {
  169. metadata[clusters.ClusterInfoThanosOffsetKey] = thanosConfig.Offset
  170. }
  171. // cluster info provider
  172. clusterInfoProvider := clusters.NewClusterInfoDecorator(infoProvider, metadata)
  173. var clusterMap clusters.ClusterMap
  174. if thanosEnabled {
  175. clusterMap = newPrometheusClusterMap(thanosContexts, clusterInfoProvider, 10*time.Minute)
  176. } else {
  177. clusterMap = newPrometheusClusterMap(promContexts, clusterInfoProvider, 5*time.Minute)
  178. }
  179. // create metrics querier implementation for prometheus and thanos
  180. metricsQuerier := newPrometheusMetricsQuerier(
  181. promConfig,
  182. promClient,
  183. promContexts,
  184. thanosConfig,
  185. thanosClient,
  186. thanosContexts,
  187. )
  188. return &PrometheusDataSource{
  189. promConfig: promConfig,
  190. promClient: promClient,
  191. promContexts: promContexts,
  192. thanosConfig: thanosConfig,
  193. thanosClient: thanosClient,
  194. thanosContexts: thanosContexts,
  195. metricsQuerier: metricsQuerier,
  196. clusterMap: clusterMap,
  197. clusterInfo: clusterInfoProvider,
  198. }, nil
  199. }
  200. var proto = protocol.HTTP()
  201. // prometheusMetadata returns the metadata for the prometheus server
  202. func (pds *PrometheusDataSource) prometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  203. w.Header().Set("Content-Type", "application/json")
  204. w.Header().Set("Access-Control-Allow-Origin", "*")
  205. resp := proto.ToResponse(Validate(pds.promClient, pds.promConfig))
  206. proto.WriteResponse(w, resp)
  207. }
  208. // prometheusRecordingRules is a proxy for /rules against prometheus
  209. func (pds *PrometheusDataSource) prometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  210. w.Header().Set("Content-Type", "application/json")
  211. w.Header().Set("Access-Control-Allow-Origin", "*")
  212. u := pds.promClient.URL(epRules, nil)
  213. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  214. if err != nil {
  215. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  216. return
  217. }
  218. _, body, err := pds.promClient.Do(r.Context(), req)
  219. if err != nil {
  220. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  221. return
  222. }
  223. w.Write(body)
  224. }
  225. // prometheusConfig returns the current configuration of the prometheus server
  226. func (pds *PrometheusDataSource) prometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  227. w.Header().Set("Content-Type", "application/json")
  228. w.Header().Set("Access-Control-Allow-Origin", "*")
  229. pConfig := map[string]string{
  230. "address": pds.promConfig.ServerEndpoint,
  231. }
  232. body, err := json.Marshal(pConfig)
  233. if err != nil {
  234. fmt.Fprintf(w, "Error marshalling prometheus config")
  235. } else {
  236. w.Write(body)
  237. }
  238. }
  239. // prometheusTargets is a proxy for /targets against prometheus
  240. func (pds *PrometheusDataSource) prometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  241. w.Header().Set("Content-Type", "application/json")
  242. w.Header().Set("Access-Control-Allow-Origin", "*")
  243. u := pds.promClient.URL(epTargets, nil)
  244. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  245. if err != nil {
  246. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  247. return
  248. }
  249. _, body, err := pds.promClient.Do(r.Context(), req)
  250. if err != nil {
  251. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  252. return
  253. }
  254. w.Write(body)
  255. }
  256. // status returns the status of the prometheus client
  257. func (pds *PrometheusDataSource) status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  258. w.Header().Set("Content-Type", "application/json")
  259. w.Header().Set("Access-Control-Allow-Origin", "*")
  260. promServer := pds.promConfig.ServerEndpoint
  261. api := prometheusAPI.NewAPI(pds.promClient)
  262. result, err := api.Buildinfo(r.Context())
  263. if err != nil {
  264. fmt.Fprintf(w, "Using Prometheus at %s, Error: %s", promServer, err)
  265. } else {
  266. fmt.Fprintf(w, "Using Prometheus at %s, version: %s", promServer, result.Version)
  267. }
  268. }
  269. // prometheusQuery is a proxy for /query against prometheus
  270. func (pds *PrometheusDataSource) prometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  271. w.Header().Set("Content-Type", "application/json")
  272. w.Header().Set("Access-Control-Allow-Origin", "*")
  273. qp := httputil.NewQueryParams(r.URL.Query())
  274. query := qp.Get("query", "")
  275. if query == "" {
  276. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  277. return
  278. }
  279. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  280. var timeVal time.Time
  281. timeStr := qp.Get("time", "")
  282. if len(timeStr) > 0 {
  283. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  284. timeVal = time.Unix(t, 0)
  285. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  286. timeVal = t
  287. }
  288. // If time is given, but not parse-able, return an error
  289. if timeVal.IsZero() {
  290. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  291. }
  292. }
  293. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  294. body, err := ctx.RawQuery(query, timeVal)
  295. if err != nil {
  296. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  297. return
  298. }
  299. w.Write(body) // prometheusQueryRange is a proxy for /query_range against prometheus
  300. }
  301. func (pds *PrometheusDataSource) prometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  302. w.Header().Set("Content-Type", "application/json")
  303. w.Header().Set("Access-Control-Allow-Origin", "*")
  304. qp := httputil.NewQueryParams(r.URL.Query())
  305. query := qp.Get("query", "")
  306. if query == "" {
  307. fmt.Fprintf(w, "Error parsing query from request parameters.")
  308. return
  309. }
  310. start, end, duration, err := toStartEndStep(qp)
  311. if err != nil {
  312. fmt.Fprintf(w, "error: %s", err)
  313. return
  314. }
  315. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  316. body, err := ctx.RawQueryRange(query, start, end, duration)
  317. if err != nil {
  318. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  319. return
  320. }
  321. w.Write(body)
  322. }
  323. // thanosQuery is a proxy for /query against thanos
  324. func (pds *PrometheusDataSource) thanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  325. w.Header().Set("Content-Type", "application/json")
  326. w.Header().Set("Access-Control-Allow-Origin", "*")
  327. if pds.thanosClient == nil {
  328. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  329. return
  330. }
  331. qp := httputil.NewQueryParams(r.URL.Query())
  332. query := qp.Get("query", "")
  333. if query == "" {
  334. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  335. return
  336. }
  337. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  338. var timeVal time.Time
  339. timeStr := qp.Get("time", "")
  340. if len(timeStr) > 0 {
  341. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  342. timeVal = time.Unix(t, 0)
  343. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  344. timeVal = t
  345. }
  346. // If time is given, but not parse-able, return an error
  347. if timeVal.IsZero() {
  348. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  349. }
  350. }
  351. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  352. body, err := ctx.RawQuery(query, timeVal)
  353. if err != nil {
  354. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  355. return
  356. }
  357. w.Write(body)
  358. }
  359. // thanosQueryRange is a proxy for /query_range against thanos
  360. func (pds *PrometheusDataSource) thanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  361. w.Header().Set("Content-Type", "application/json")
  362. w.Header().Set("Access-Control-Allow-Origin", "*")
  363. if pds.thanosClient == nil {
  364. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  365. return
  366. }
  367. qp := httputil.NewQueryParams(r.URL.Query())
  368. query := qp.Get("query", "")
  369. if query == "" {
  370. fmt.Fprintf(w, "Error parsing query from request parameters.")
  371. return
  372. }
  373. start, end, duration, err := toStartEndStep(qp)
  374. if err != nil {
  375. fmt.Fprintf(w, "error: %s", err)
  376. return
  377. }
  378. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  379. body, err := ctx.RawQueryRange(query, start, end, duration)
  380. if err != nil {
  381. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  382. return
  383. }
  384. w.Write(body)
  385. }
  386. // promtheusQueueState returns the current state of the prometheus and thanos request queues
  387. func (pds *PrometheusDataSource) prometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  388. w.Header().Set("Content-Type", "application/json")
  389. w.Header().Set("Access-Control-Allow-Origin", "*")
  390. promQueueState, err := GetPrometheusQueueState(pds.promClient, pds.promConfig)
  391. if err != nil {
  392. proto.WriteResponse(w, proto.ToResponse(nil, err))
  393. return
  394. }
  395. result := map[string]*PrometheusQueueState{
  396. "prometheus": promQueueState,
  397. }
  398. if pds.thanosClient != nil {
  399. thanosQueueState, err := GetPrometheusQueueState(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig)
  400. if err != nil {
  401. log.Warnf("Error getting Thanos queue state: %s", err)
  402. } else {
  403. result["thanos"] = thanosQueueState
  404. }
  405. }
  406. proto.WriteResponse(w, proto.ToResponse(result, nil))
  407. }
  408. // prometheusMetrics retrieves availability of Prometheus and Thanos metrics
  409. func (pds *PrometheusDataSource) prometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  410. w.Header().Set("Content-Type", "application/json")
  411. w.Header().Set("Access-Control-Allow-Origin", "*")
  412. promMetrics := GetPrometheusMetrics(pds.promClient, pds.promConfig, "")
  413. result := map[string][]*PrometheusDiagnostic{
  414. "prometheus": promMetrics,
  415. }
  416. if pds.thanosClient != nil {
  417. thanosMetrics := GetPrometheusMetrics(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig, pds.thanosConfig.Offset)
  418. result["thanos"] = thanosMetrics
  419. }
  420. proto.WriteResponse(w, proto.ToResponse(result, nil))
  421. }
  422. func (pds *PrometheusDataSource) PrometheusClient() prometheus.Client {
  423. return pds.promClient
  424. }
  425. func (pds *PrometheusDataSource) PrometheusConfig() *OpenCostPrometheusConfig {
  426. return pds.promConfig
  427. }
  428. func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
  429. return pds.promContexts
  430. }
  431. func (pds *PrometheusDataSource) ThanosClient() prometheus.Client {
  432. return pds.thanosClient
  433. }
  434. func (pds *PrometheusDataSource) ThanosConfig() *OpenCostThanosConfig {
  435. return pds.thanosConfig
  436. }
  437. func (pds *PrometheusDataSource) ThanosContexts() *ContextFactory {
  438. return pds.thanosContexts
  439. }
  440. func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
  441. // endpoints migrated from server
  442. router.GET("/validatePrometheus", pds.prometheusMetadata)
  443. router.GET("/prometheusRecordingRules", pds.prometheusRecordingRules)
  444. router.GET("/prometheusConfig", pds.prometheusConfig)
  445. router.GET("/prometheusTargets", pds.prometheusTargets)
  446. router.GET("/status", pds.status)
  447. // prom query proxies
  448. router.GET("/prometheusQuery", pds.prometheusQuery)
  449. router.GET("/prometheusQueryRange", pds.prometheusQueryRange)
  450. router.GET("/thanosQuery", pds.thanosQuery)
  451. router.GET("/thanosQueryRange", pds.thanosQueryRange)
  452. // diagnostics
  453. router.GET("/diagnostics/requestQueue", pds.prometheusQueueState)
  454. router.GET("/diagnostics/prometheusMetrics", pds.prometheusMetrics)
  455. }
  456. // RegisterDiagnostics registers any custom data source diagnostics with the `DiagnosticService` that can
  457. // be used to report externally.
  458. func (pds *PrometheusDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  459. const PrometheusDiagnosticCategory = "prometheus"
  460. for _, dd := range diagnosticDefinitions {
  461. err := diagService.Register(dd.ID, dd.Description, PrometheusDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  462. promDiag := dd.NewDiagnostic(pds.promConfig.ClusterFilter, "")
  463. promContext := pds.promContexts.NewNamedContext(DiagnosticContextName)
  464. e := promDiag.executePrometheusDiagnosticQuery(promContext)
  465. if e != nil {
  466. return nil, fmt.Errorf("failed to execute prometheus diagnostic: %s - %w", dd.ID, e)
  467. }
  468. return promDiag.AsMap(), nil
  469. })
  470. if err != nil {
  471. log.Warnf("Failed to register prometheus diagnostic %s: %s", dd.ID, err.Error())
  472. }
  473. }
  474. }
  475. func (pds *PrometheusDataSource) RefreshInterval() time.Duration {
  476. return pds.promConfig.ScrapeInterval
  477. }
  478. func (pds *PrometheusDataSource) Metrics() source.MetricsQuerier {
  479. return pds.metricsQuerier
  480. }
  481. func (pds *PrometheusDataSource) ClusterMap() clusters.ClusterMap {
  482. return pds.clusterMap
  483. }
  484. // ClusterInfo returns the ClusterInfoProvider for the local cluster.
  485. func (pds *PrometheusDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  486. return pds.clusterInfo
  487. }
  488. func (pds *PrometheusDataSource) BatchDuration() time.Duration {
  489. return pds.promConfig.MaxQueryDuration
  490. }
  491. func (pds *PrometheusDataSource) Resolution() time.Duration {
  492. return pds.promConfig.DataResolution
  493. }