datasource.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/Masterminds/semver/v3"
  9. "github.com/julienschmidt/httprouter"
  10. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  11. "github.com/opencost/opencost/core/pkg/clusters"
  12. "github.com/opencost/opencost/core/pkg/log"
  13. "github.com/opencost/opencost/core/pkg/protocol"
  14. "github.com/opencost/opencost/core/pkg/source"
  15. "github.com/opencost/opencost/core/pkg/util/httputil"
  16. "github.com/opencost/opencost/core/pkg/util/json"
  17. prometheus "github.com/prometheus/client_golang/api"
  18. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  19. )
  20. const (
  21. apiPrefix = "/api/v1"
  22. epAlertManagers = apiPrefix + "/alertmanagers"
  23. epLabelValues = apiPrefix + "/label/:name/values"
  24. epSeries = apiPrefix + "/series"
  25. epTargets = apiPrefix + "/targets"
  26. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  27. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  28. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  29. epConfig = apiPrefix + "/status/config"
  30. epFlags = apiPrefix + "/status/flags"
  31. epRules = apiPrefix + "/rules"
  32. )
  33. // helper for query range proxy requests
  34. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  35. var e error
  36. ss := qp.Get("start", "")
  37. es := qp.Get("end", "")
  38. ds := qp.Get("duration", "")
  39. layout := "2006-01-02T15:04:05.000Z"
  40. start, e = time.Parse(layout, ss)
  41. if e != nil {
  42. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  43. return
  44. }
  45. end, e = time.Parse(layout, es)
  46. if e != nil {
  47. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  48. return
  49. }
  50. step, e = time.ParseDuration(ds)
  51. if e != nil {
  52. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  53. return
  54. }
  55. err = nil
  56. return
  57. }
  58. // creates a new help error which indicates the caller can retry and is non-fatal.
  59. func newHelpRetryError(format string, args ...any) error {
  60. formatWithHelp := format + "\nTroubleshooting help available at: %s"
  61. args = append(args, PrometheusTroubleshootingURL)
  62. cause := fmt.Errorf(formatWithHelp, args...)
  63. return source.NewHelpRetryError(cause)
  64. }
  65. // PrometheusDataSource is the OpenCost data source implementation leveraging Prometheus. Prometheus provides longer retention periods and
  66. // more detailed metrics than the OpenCost Collector, which is useful for historical analysis and cost forecasting.
  67. type PrometheusDataSource struct {
  68. promConfig *OpenCostPrometheusConfig
  69. promClient prometheus.Client
  70. promContexts *ContextFactory
  71. thanosConfig *OpenCostThanosConfig
  72. thanosClient prometheus.Client
  73. thanosContexts *ContextFactory
  74. metricsQuerier *PrometheusMetricsQuerier
  75. clusterMap clusters.ClusterMap
  76. clusterInfo clusters.ClusterInfoProvider
  77. }
  78. // NewDefaultPrometheusDataSource creates and initializes a new `PrometheusDataSource` with configuration
  79. // parsed from environment variables. This function will block until a connection to prometheus is established,
  80. // or fails. It is recommended to run this function in a goroutine on a retry cycle.
  81. func NewDefaultPrometheusDataSource(clusterInfoProvider clusters.ClusterInfoProvider) (*PrometheusDataSource, error) {
  82. config, err := NewOpenCostPrometheusConfigFromEnv()
  83. if err != nil {
  84. return nil, fmt.Errorf("failed to create prometheus config from env: %w", err)
  85. }
  86. var thanosConfig *OpenCostThanosConfig
  87. if env.IsThanosEnabled() {
  88. // thanos initialization is not fatal, so we log the error and continue
  89. thanosConfig, err = NewOpenCostThanosConfigFromEnv()
  90. if err != nil {
  91. log.Warnf("Thanos was enabled, but failed to create thanos config from env: %s. Continuing...", err.Error())
  92. }
  93. }
  94. return NewPrometheusDataSource(clusterInfoProvider, config, thanosConfig)
  95. }
  96. // NewPrometheusDataSource initializes clients for Prometheus and Thanos, and returns a new PrometheusDataSource.
  97. func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConfig *OpenCostPrometheusConfig, thanosConfig *OpenCostThanosConfig) (*PrometheusDataSource, error) {
  98. promClient, err := NewPrometheusClient(promConfig.ServerEndpoint, promConfig.ClientConfig)
  99. if err != nil {
  100. return nil, fmt.Errorf("failed to build prometheus client: %w", err)
  101. }
  102. // validation of the prometheus client
  103. m, err := Validate(promClient, promConfig)
  104. if err != nil || !m.Running {
  105. if err != nil {
  106. return nil, newHelpRetryError("failed to query prometheus at %s: %w", promConfig.ServerEndpoint, err)
  107. } else if !m.Running {
  108. return nil, newHelpRetryError("prometheus at %s is not running", promConfig.ServerEndpoint)
  109. }
  110. } else {
  111. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", promConfig.ServerEndpoint)
  112. }
  113. // we don't consider this a fatal error, but we log for visibility
  114. api := prometheusAPI.NewAPI(promClient)
  115. bi, err := api.Buildinfo(context.Background())
  116. if err != nil {
  117. log.Infof("No valid prometheus config file at %s. Error: %s.\nTroubleshooting help available at: %s.\n**Ignore if using cortex/mimir/thanos here**", promConfig.ServerEndpoint, err.Error(), PrometheusTroubleshootingURL)
  118. } else {
  119. log.Infof("Retrieved a prometheus config file from: %s", promConfig.ServerEndpoint)
  120. promConfig.Version = bi.Version
  121. // for versions of prometheus >= 3.0.0, we need to offset the resolution for range queries
  122. // due to a breaking change in prometheus lookback and range query alignment
  123. v, err := semver.NewVersion(promConfig.Version)
  124. if err != nil {
  125. log.Warnf("Failed to parse prometheus version %s. Error: %s", promConfig.Version, err.Error())
  126. } else {
  127. promConfig.IsOffsetResolution = v.Major() >= 3
  128. }
  129. }
  130. // Fix scrape interval if zero by attempting to lookup the interval for the configured job
  131. if promConfig.ScrapeInterval == 0 {
  132. promConfig.ScrapeInterval = time.Minute
  133. // Lookup scrape interval for kubecost job, update if found
  134. si, err := ScrapeIntervalFor(promClient, promConfig.JobName)
  135. if err == nil {
  136. promConfig.ScrapeInterval = si
  137. }
  138. }
  139. log.Infof("Using scrape interval of %f", promConfig.ScrapeInterval.Seconds())
  140. promContexts := NewContextFactory(promClient, promConfig)
  141. var thanosClient prometheus.Client
  142. var thanosContexts *ContextFactory
  143. // if the thanos configuration is non-nil, we assume intent to use thanos. However, failure to
  144. // initialize the thanos client is not fatal, and we will log the error and continue.
  145. if thanosConfig != nil {
  146. thanosHost := thanosConfig.ServerEndpoint
  147. if thanosHost != "" {
  148. thanosCli, _ := NewThanosClient(thanosHost, thanosConfig)
  149. _, err = Validate(thanosCli, thanosConfig.OpenCostPrometheusConfig)
  150. if err != nil {
  151. log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosHost, err.Error())
  152. thanosClient = thanosCli
  153. } else {
  154. log.Infof("Success: retrieved the 'up' query against Thanos at: %s", thanosHost)
  155. thanosClient = thanosCli
  156. }
  157. thanosContexts = NewContextFactory(thanosClient, thanosConfig.OpenCostPrometheusConfig)
  158. } else {
  159. log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  160. }
  161. }
  162. // metadata creation for cluster info
  163. thanosEnabled := thanosClient != nil
  164. metadata := map[string]string{
  165. clusters.ClusterInfoThanosEnabledKey: fmt.Sprintf("%t", thanosEnabled),
  166. }
  167. if thanosEnabled {
  168. metadata[clusters.ClusterInfoThanosOffsetKey] = thanosConfig.Offset
  169. }
  170. // cluster info provider
  171. clusterInfoProvider := clusters.NewClusterInfoDecorator(infoProvider, metadata)
  172. var clusterMap clusters.ClusterMap
  173. if thanosEnabled {
  174. clusterMap = newPrometheusClusterMap(thanosContexts, clusterInfoProvider, 10*time.Minute)
  175. } else {
  176. clusterMap = newPrometheusClusterMap(promContexts, clusterInfoProvider, 5*time.Minute)
  177. }
  178. // create metrics querier implementation for prometheus and thanos
  179. metricsQuerier := newPrometheusMetricsQuerier(
  180. promConfig,
  181. promClient,
  182. promContexts,
  183. thanosConfig,
  184. thanosClient,
  185. thanosContexts,
  186. )
  187. return &PrometheusDataSource{
  188. promConfig: promConfig,
  189. promClient: promClient,
  190. promContexts: promContexts,
  191. thanosConfig: thanosConfig,
  192. thanosClient: thanosClient,
  193. thanosContexts: thanosContexts,
  194. metricsQuerier: metricsQuerier,
  195. clusterMap: clusterMap,
  196. clusterInfo: clusterInfoProvider,
  197. }, nil
  198. }
  199. var proto = protocol.HTTP()
  200. // prometheusMetadata returns the metadata for the prometheus server
  201. func (pds *PrometheusDataSource) prometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  202. w.Header().Set("Content-Type", "application/json")
  203. w.Header().Set("Access-Control-Allow-Origin", "*")
  204. resp := proto.ToResponse(Validate(pds.promClient, pds.promConfig))
  205. proto.WriteResponse(w, resp)
  206. }
  207. // prometheusRecordingRules is a proxy for /rules against prometheus
  208. func (pds *PrometheusDataSource) prometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  209. w.Header().Set("Content-Type", "application/json")
  210. w.Header().Set("Access-Control-Allow-Origin", "*")
  211. u := pds.promClient.URL(epRules, nil)
  212. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  213. if err != nil {
  214. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  215. return
  216. }
  217. _, body, err := pds.promClient.Do(r.Context(), req)
  218. if err != nil {
  219. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  220. return
  221. }
  222. w.Write(body)
  223. }
  224. // prometheusConfig returns the current configuration of the prometheus server
  225. func (pds *PrometheusDataSource) prometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  226. w.Header().Set("Content-Type", "application/json")
  227. w.Header().Set("Access-Control-Allow-Origin", "*")
  228. pConfig := map[string]string{
  229. "address": pds.promConfig.ServerEndpoint,
  230. }
  231. body, err := json.Marshal(pConfig)
  232. if err != nil {
  233. fmt.Fprintf(w, "Error marshalling prometheus config")
  234. } else {
  235. w.Write(body)
  236. }
  237. }
  238. // prometheusTargets is a proxy for /targets against prometheus
  239. func (pds *PrometheusDataSource) prometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  240. w.Header().Set("Content-Type", "application/json")
  241. w.Header().Set("Access-Control-Allow-Origin", "*")
  242. u := pds.promClient.URL(epTargets, nil)
  243. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  244. if err != nil {
  245. fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
  246. return
  247. }
  248. _, body, err := pds.promClient.Do(r.Context(), req)
  249. if err != nil {
  250. fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
  251. return
  252. }
  253. w.Write(body)
  254. }
  255. // status returns the status of the prometheus client
  256. func (pds *PrometheusDataSource) status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  257. w.Header().Set("Content-Type", "application/json")
  258. w.Header().Set("Access-Control-Allow-Origin", "*")
  259. promServer := pds.promConfig.ServerEndpoint
  260. api := prometheusAPI.NewAPI(pds.promClient)
  261. result, err := api.Buildinfo(r.Context())
  262. if err != nil {
  263. fmt.Fprintf(w, "Using Prometheus at %s, Error: %s", promServer, err)
  264. } else {
  265. fmt.Fprintf(w, "Using Prometheus at %s, version: %s", promServer, result.Version)
  266. }
  267. }
  268. // prometheusQuery is a proxy for /query against prometheus
  269. func (pds *PrometheusDataSource) prometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  270. w.Header().Set("Content-Type", "application/json")
  271. w.Header().Set("Access-Control-Allow-Origin", "*")
  272. qp := httputil.NewQueryParams(r.URL.Query())
  273. query := qp.Get("query", "")
  274. if query == "" {
  275. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  276. return
  277. }
  278. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  279. var timeVal time.Time
  280. timeStr := qp.Get("time", "")
  281. if len(timeStr) > 0 {
  282. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  283. timeVal = time.Unix(t, 0)
  284. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  285. timeVal = t
  286. }
  287. // If time is given, but not parse-able, return an error
  288. if timeVal.IsZero() {
  289. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  290. }
  291. }
  292. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  293. body, err := ctx.RawQuery(query, timeVal)
  294. if err != nil {
  295. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  296. return
  297. }
  298. w.Write(body) // prometheusQueryRange is a proxy for /query_range against prometheus
  299. }
  300. func (pds *PrometheusDataSource) prometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  301. w.Header().Set("Content-Type", "application/json")
  302. w.Header().Set("Access-Control-Allow-Origin", "*")
  303. qp := httputil.NewQueryParams(r.URL.Query())
  304. query := qp.Get("query", "")
  305. if query == "" {
  306. fmt.Fprintf(w, "Error parsing query from request parameters.")
  307. return
  308. }
  309. start, end, duration, err := toStartEndStep(qp)
  310. if err != nil {
  311. fmt.Fprintf(w, "error: %s", err)
  312. return
  313. }
  314. ctx := pds.promContexts.NewNamedContext(FrontendContextName)
  315. body, err := ctx.RawQueryRange(query, start, end, duration)
  316. if err != nil {
  317. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  318. return
  319. }
  320. w.Write(body)
  321. }
  322. // thanosQuery is a proxy for /query against thanos
  323. func (pds *PrometheusDataSource) thanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  324. w.Header().Set("Content-Type", "application/json")
  325. w.Header().Set("Access-Control-Allow-Origin", "*")
  326. if pds.thanosClient == nil {
  327. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  328. return
  329. }
  330. qp := httputil.NewQueryParams(r.URL.Query())
  331. query := qp.Get("query", "")
  332. if query == "" {
  333. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  334. return
  335. }
  336. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  337. var timeVal time.Time
  338. timeStr := qp.Get("time", "")
  339. if len(timeStr) > 0 {
  340. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  341. timeVal = time.Unix(t, 0)
  342. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  343. timeVal = t
  344. }
  345. // If time is given, but not parse-able, return an error
  346. if timeVal.IsZero() {
  347. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  348. }
  349. }
  350. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  351. body, err := ctx.RawQuery(query, timeVal)
  352. if err != nil {
  353. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  354. return
  355. }
  356. w.Write(body)
  357. }
  358. // thanosQueryRange is a proxy for /query_range against thanos
  359. func (pds *PrometheusDataSource) thanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  360. w.Header().Set("Content-Type", "application/json")
  361. w.Header().Set("Access-Control-Allow-Origin", "*")
  362. if pds.thanosClient == nil {
  363. proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
  364. return
  365. }
  366. qp := httputil.NewQueryParams(r.URL.Query())
  367. query := qp.Get("query", "")
  368. if query == "" {
  369. fmt.Fprintf(w, "Error parsing query from request parameters.")
  370. return
  371. }
  372. start, end, duration, err := toStartEndStep(qp)
  373. if err != nil {
  374. fmt.Fprintf(w, "error: %s", err)
  375. return
  376. }
  377. ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
  378. body, err := ctx.RawQueryRange(query, start, end, duration)
  379. if err != nil {
  380. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  381. return
  382. }
  383. w.Write(body)
  384. }
  385. // promtheusQueueState returns the current state of the prometheus and thanos request queues
  386. func (pds *PrometheusDataSource) prometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  387. w.Header().Set("Content-Type", "application/json")
  388. w.Header().Set("Access-Control-Allow-Origin", "*")
  389. promQueueState, err := GetPrometheusQueueState(pds.promClient, pds.promConfig)
  390. if err != nil {
  391. proto.WriteResponse(w, proto.ToResponse(nil, err))
  392. return
  393. }
  394. result := map[string]*PrometheusQueueState{
  395. "prometheus": promQueueState,
  396. }
  397. if pds.thanosClient != nil {
  398. thanosQueueState, err := GetPrometheusQueueState(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig)
  399. if err != nil {
  400. log.Warnf("Error getting Thanos queue state: %s", err)
  401. } else {
  402. result["thanos"] = thanosQueueState
  403. }
  404. }
  405. proto.WriteResponse(w, proto.ToResponse(result, nil))
  406. }
  407. // prometheusMetrics retrieves availability of Prometheus and Thanos metrics
  408. func (pds *PrometheusDataSource) prometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  409. w.Header().Set("Content-Type", "application/json")
  410. w.Header().Set("Access-Control-Allow-Origin", "*")
  411. promMetrics := GetPrometheusMetrics(pds.promClient, pds.promConfig, "")
  412. result := map[string][]*PrometheusDiagnostic{
  413. "prometheus": promMetrics,
  414. }
  415. if pds.thanosClient != nil {
  416. thanosMetrics := GetPrometheusMetrics(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig, pds.thanosConfig.Offset)
  417. result["thanos"] = thanosMetrics
  418. }
  419. proto.WriteResponse(w, proto.ToResponse(result, nil))
  420. }
  421. func (pds *PrometheusDataSource) PrometheusClient() prometheus.Client {
  422. return pds.promClient
  423. }
  424. func (pds *PrometheusDataSource) PrometheusConfig() *OpenCostPrometheusConfig {
  425. return pds.promConfig
  426. }
  427. func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
  428. return pds.promContexts
  429. }
  430. func (pds *PrometheusDataSource) ThanosClient() prometheus.Client {
  431. return pds.thanosClient
  432. }
  433. func (pds *PrometheusDataSource) ThanosConfig() *OpenCostThanosConfig {
  434. return pds.thanosConfig
  435. }
  436. func (pds *PrometheusDataSource) ThanosContexts() *ContextFactory {
  437. return pds.thanosContexts
  438. }
  439. func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
  440. // endpoints migrated from server
  441. router.GET("/validatePrometheus", pds.prometheusMetadata)
  442. router.GET("/prometheusRecordingRules", pds.prometheusRecordingRules)
  443. router.GET("/prometheusConfig", pds.prometheusConfig)
  444. router.GET("/prometheusTargets", pds.prometheusTargets)
  445. router.GET("/status", pds.status)
  446. // prom query proxies
  447. router.GET("/prometheusQuery", pds.prometheusQuery)
  448. router.GET("/prometheusQueryRange", pds.prometheusQueryRange)
  449. router.GET("/thanosQuery", pds.thanosQuery)
  450. router.GET("/thanosQueryRange", pds.thanosQueryRange)
  451. // diagnostics
  452. router.GET("/diagnostics/requestQueue", pds.prometheusQueueState)
  453. router.GET("/diagnostics/prometheusMetrics", pds.prometheusMetrics)
  454. }
  455. func (pds *PrometheusDataSource) RefreshInterval() time.Duration {
  456. return pds.promConfig.ScrapeInterval
  457. }
  458. func (pds *PrometheusDataSource) Metrics() source.MetricsQuerier {
  459. return pds.metricsQuerier
  460. }
  461. func (pds *PrometheusDataSource) ClusterMap() clusters.ClusterMap {
  462. return pds.clusterMap
  463. }
  464. // ClusterInfo returns the ClusterInfoProvider for the local cluster.
  465. func (pds *PrometheusDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  466. return pds.clusterInfo
  467. }
  468. func (pds *PrometheusDataSource) BatchDuration() time.Duration {
  469. return pds.promConfig.MaxQueryDuration
  470. }
  471. func (pds *PrometheusDataSource) Resolution() time.Duration {
  472. return pds.promConfig.DataResolution
  473. }