datasource.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/Masterminds/semver/v3"
  7. "github.com/julienschmidt/httprouter"
  8. "github.com/opencost/opencost/core/pkg/clusters"
  9. "github.com/opencost/opencost/core/pkg/diagnostics"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  14. )
  15. // creates a new help error which indicates the caller can retry and is non-fatal.
  16. func newHelpRetryError(format string, args ...any) error {
  17. formatWithHelp := format + "\nTroubleshooting help available at: %s"
  18. args = append(args, PrometheusTroubleshootingURL)
  19. cause := fmt.Errorf(formatWithHelp, args...)
  20. return source.NewHelpRetryError(cause)
  21. }
  22. // PrometheusDataSource is the OpenCost data source implementation leveraging Prometheus. Prometheus provides longer retention periods and
  23. // more detailed metrics than the OpenCost Collector, which is useful for historical analysis and cost forecasting.
  24. type PrometheusDataSource struct {
  25. promConfig *OpenCostPrometheusConfig
  26. promClient prometheus.Client
  27. promContexts *ContextFactory
  28. metricsQuerier *PrometheusMetricsQuerier
  29. clusterMap clusters.ClusterMap
  30. clusterInfo clusters.ClusterInfoProvider
  31. }
  32. // NewDefaultPrometheusDataSource creates and initializes a new `PrometheusDataSource` with configuration
  33. // parsed from environment variables. This function will block until a connection to prometheus is established,
  34. // or fails. It is recommended to run this function in a goroutine on a retry cycle.
  35. func NewDefaultPrometheusDataSource(clusterInfoProvider clusters.ClusterInfoProvider) (*PrometheusDataSource, error) {
  36. config, err := NewOpenCostPrometheusConfigFromEnv()
  37. if err != nil {
  38. return nil, fmt.Errorf("failed to create prometheus config from env: %w", err)
  39. }
  40. return NewPrometheusDataSource(clusterInfoProvider, config)
  41. }
  42. // NewPrometheusDataSource initializes clients for Prometheus and Thanos, and returns a new PrometheusDataSource.
  43. func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConfig *OpenCostPrometheusConfig) (*PrometheusDataSource, error) {
  44. promClient, err := NewPrometheusClient(promConfig.ServerEndpoint, promConfig.ClientConfig)
  45. if err != nil {
  46. return nil, fmt.Errorf("failed to build prometheus client: %w", err)
  47. }
  48. // validation of the prometheus client
  49. m, err := Validate(promClient, promConfig)
  50. if err != nil || !m.Running {
  51. if err != nil {
  52. return nil, newHelpRetryError("failed to query prometheus at %s: %w", promConfig.ServerEndpoint, err)
  53. } else if !m.Running {
  54. return nil, newHelpRetryError("prometheus at %s is not running", promConfig.ServerEndpoint)
  55. }
  56. } else {
  57. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", promConfig.ServerEndpoint)
  58. }
  59. // we don't consider this a fatal error, but we log for visibility
  60. api := prometheusAPI.NewAPI(promClient)
  61. bi, err := api.Buildinfo(context.Background())
  62. if err != nil {
  63. log.Infof("No valid prometheus config file at %s. Error: %s.\nTroubleshooting help available at: %s.\n**Ignore if using cortex/mimir/thanos here**", promConfig.ServerEndpoint, err.Error(), PrometheusTroubleshootingURL)
  64. } else {
  65. log.Infof("Retrieved a prometheus config file from: %s", promConfig.ServerEndpoint)
  66. promConfig.Version = bi.Version
  67. // for versions of prometheus >= 3.0.0, we need to offset the resolution for range queries
  68. // due to a breaking change in prometheus lookback and range query alignment
  69. v, err := semver.NewVersion(promConfig.Version)
  70. if err != nil {
  71. log.Warnf("Failed to parse prometheus version %s. Error: %s", promConfig.Version, err.Error())
  72. } else {
  73. promConfig.IsOffsetResolution = v.Major() >= 3
  74. }
  75. }
  76. // Fix scrape interval if zero by attempting to lookup the interval for the configured job
  77. if promConfig.ScrapeInterval == 0 {
  78. promConfig.ScrapeInterval = time.Minute
  79. // Lookup scrape interval for kubecost job, update if found
  80. si, err := ScrapeIntervalFor(promClient, promConfig.JobName)
  81. if err == nil {
  82. promConfig.ScrapeInterval = si
  83. }
  84. }
  85. log.Infof("Using scrape interval of %f", promConfig.ScrapeInterval.Seconds())
  86. promContexts := NewContextFactory(promClient, promConfig)
  87. // metadata creation for cluster info
  88. metadata := map[string]string{
  89. clusters.ClusterInfoThanosEnabledKey: "false",
  90. }
  91. // cluster info provider
  92. clusterInfoProvider := clusters.NewClusterInfoDecorator(infoProvider, metadata)
  93. clusterMap := newPrometheusClusterMap(promContexts, clusterInfoProvider, 5*time.Minute)
  94. // create metrics querier implementation for prometheus and thanos
  95. metricsQuerier := newPrometheusMetricsQuerier(
  96. promConfig,
  97. promClient,
  98. promContexts,
  99. )
  100. return &PrometheusDataSource{
  101. promConfig: promConfig,
  102. promClient: promClient,
  103. promContexts: promContexts,
  104. metricsQuerier: metricsQuerier,
  105. clusterMap: clusterMap,
  106. clusterInfo: clusterInfoProvider,
  107. }, nil
  108. }
  109. func (pds *PrometheusDataSource) PrometheusClient() prometheus.Client {
  110. return pds.promClient
  111. }
  112. func (pds *PrometheusDataSource) PrometheusConfig() *OpenCostPrometheusConfig {
  113. return pds.promConfig
  114. }
  115. func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
  116. return pds.promContexts
  117. }
  118. func (pds *PrometheusDataSource) RegisterEndPoints(_ *httprouter.Router) {}
  119. // RegisterDiagnostics registers any custom data source diagnostics with the `DiagnosticService` that can
  120. // be used to report externally.
  121. func (pds *PrometheusDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  122. const PrometheusDiagnosticCategory = "prometheus"
  123. for _, dd := range diagnosticDefinitions {
  124. err := diagService.Register(dd.ID, dd.Description, PrometheusDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  125. promDiag := dd.NewDiagnostic(pds.promConfig.ClusterFilter, "")
  126. promContext := pds.promContexts.NewNamedContext(DiagnosticContextName)
  127. e := promDiag.executePrometheusDiagnosticQuery(promContext)
  128. if e != nil {
  129. return nil, fmt.Errorf("failed to execute prometheus diagnostic: %s - %w", dd.ID, e)
  130. }
  131. return promDiag.AsMap(), nil
  132. })
  133. if err != nil {
  134. log.Warnf("Failed to register prometheus diagnostic %s: %s", dd.ID, err.Error())
  135. }
  136. }
  137. }
  138. func (pds *PrometheusDataSource) RefreshInterval() time.Duration {
  139. return pds.promConfig.ScrapeInterval
  140. }
  141. func (pds *PrometheusDataSource) Metrics() source.MetricsQuerier {
  142. return pds.metricsQuerier
  143. }
  144. func (pds *PrometheusDataSource) ClusterMap() clusters.ClusterMap {
  145. return pds.clusterMap
  146. }
  147. // ClusterInfo returns the ClusterInfoProvider for the local cluster.
  148. func (pds *PrometheusDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  149. return pds.clusterInfo
  150. }
  151. func (pds *PrometheusDataSource) BatchDuration() time.Duration {
  152. return pds.promConfig.MaxQueryDuration
  153. }
  154. func (pds *PrometheusDataSource) Resolution() time.Duration {
  155. return pds.promConfig.DataResolution
  156. }