config.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package prom
  2. import (
  3. "crypto/x509"
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/util/timeutil"
  8. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  9. restclient "k8s.io/client-go/rest"
  10. certutil "k8s.io/client-go/util/cert"
  11. )
  12. const (
  13. ServiceCA = `/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`
  14. )
  15. type OpenCostPrometheusConfig struct {
  16. ServerEndpoint string
  17. ClientConfig *PrometheusClientConfig
  18. ScrapeInterval time.Duration
  19. JobName string
  20. Offset string
  21. QueryOffset time.Duration
  22. MaxQueryDuration time.Duration
  23. ClusterLabel string
  24. ClusterID string
  25. ClusterFilter string
  26. DataResolution time.Duration
  27. DataResolutionMinutes int
  28. }
  29. type OpenCostThanosConfig struct {
  30. *OpenCostPrometheusConfig
  31. MaxSourceResulution string
  32. }
  33. func (ocpc *OpenCostPrometheusConfig) IsRateLimitRetryEnabled() bool {
  34. return ocpc.ClientConfig.RateLimitRetryOpts != nil
  35. }
  36. // NewOpenCostPrometheusConfigFromEnv creates a new OpenCostPrometheusConfig from environment variables.
  37. func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
  38. serverEndpoint := env.GetPrometheusServerEndpoint()
  39. if serverEndpoint == "" {
  40. return nil, fmt.Errorf("no address for prometheus set in $%s", env.PrometheusServerEndpointEnvVar)
  41. }
  42. queryConcurrency := env.GetMaxQueryConcurrency()
  43. log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
  44. timeout := env.GetPrometheusQueryTimeout()
  45. keepAlive := env.GetPrometheusKeepAlive()
  46. tlsHandshakeTimeout := env.GetPrometheusTLSHandshakeTimeout()
  47. jobName := env.GetJobName()
  48. scrapeInterval := env.GetScrapeInterval()
  49. maxQueryDuration := env.GetETLMaxPrometheusQueryDuration()
  50. clusterId := env.GetClusterID()
  51. clusterLabel := env.GetPromClusterLabel()
  52. clusterFilter := env.GetPromClusterFilter()
  53. var rateLimitRetryOpts *RateLimitRetryOpts = nil
  54. if env.IsPrometheusRetryOnRateLimitResponse() {
  55. rateLimitRetryOpts = &RateLimitRetryOpts{
  56. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  57. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  58. }
  59. }
  60. auth := &ClientAuth{
  61. Username: env.GetDBBasicAuthUsername(),
  62. Password: env.GetDBBasicAuthUserPassword(),
  63. BearerToken: env.GetDBBearerToken(),
  64. }
  65. // We will use the service account token and service-ca.crt to authenticate with the Prometheus server via kube-rbac-proxy.
  66. // We need to ensure that the service account has the necessary permissions to access the Prometheus server by binding it to the appropriate role.
  67. var tlsCaCert *x509.CertPool
  68. if env.IsKubeRbacProxyEnabled() {
  69. restConfig, err := restclient.InClusterConfig()
  70. if err != nil {
  71. log.Errorf("%s was set to true but failed to get in-cluster config: %s", env.KubeRbacProxyEnabledEnvVar, err)
  72. }
  73. auth.BearerToken = restConfig.BearerToken
  74. tlsCaCert, err = certutil.NewPool(ServiceCA)
  75. if err != nil {
  76. log.Errorf("%s was set to true but failed to load service-ca.crt: %s", env.KubeRbacProxyEnabledEnvVar, err)
  77. }
  78. }
  79. dataResolution := env.GetETLResolution()
  80. // Ensuring if data resolution is less than 60s default it to 1m
  81. resolutionMinutes := int(dataResolution.Minutes())
  82. if resolutionMinutes == 0 {
  83. resolutionMinutes = 1
  84. }
  85. clientConfig := &PrometheusClientConfig{
  86. Timeout: timeout,
  87. KeepAlive: keepAlive,
  88. TLSHandshakeTimeout: tlsHandshakeTimeout,
  89. TLSInsecureSkipVerify: env.IsInsecureSkipVerify(),
  90. RootCAs: tlsCaCert,
  91. RateLimitRetryOpts: rateLimitRetryOpts,
  92. Auth: auth,
  93. QueryConcurrency: queryConcurrency,
  94. QueryLogFile: "",
  95. HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
  96. }
  97. return &OpenCostPrometheusConfig{
  98. ServerEndpoint: serverEndpoint,
  99. ClientConfig: clientConfig,
  100. ScrapeInterval: scrapeInterval,
  101. JobName: jobName,
  102. Offset: "",
  103. QueryOffset: time.Duration(0),
  104. MaxQueryDuration: maxQueryDuration,
  105. ClusterLabel: clusterLabel,
  106. ClusterID: clusterId,
  107. ClusterFilter: clusterFilter,
  108. DataResolution: dataResolution,
  109. DataResolutionMinutes: resolutionMinutes,
  110. }, nil
  111. }
  112. // NewOpenCostPrometheusConfigFromEnv creates a new OpenCostPrometheusConfig from environment variables.
  113. func NewOpenCostThanosConfigFromEnv() (*OpenCostThanosConfig, error) {
  114. serverEndpoint := env.GetThanosQueryUrl()
  115. if serverEndpoint == "" {
  116. return nil, fmt.Errorf("no address for thanos set in $%s", env.ThanosQueryUrlEnvVar)
  117. }
  118. queryConcurrency := env.GetMaxQueryConcurrency()
  119. log.Infof("Thanos Client Max Concurrency set to %d", queryConcurrency)
  120. timeout := env.GetPrometheusQueryTimeout()
  121. keepAlive := env.GetPrometheusKeepAlive()
  122. tlsHandshakeTimeout := env.GetPrometheusTLSHandshakeTimeout()
  123. jobName := env.GetJobName()
  124. scrapeInterval := env.GetScrapeInterval()
  125. maxQueryDuration := env.GetETLMaxPrometheusQueryDuration()
  126. clusterLabel := env.GetPromClusterLabel()
  127. var rateLimitRetryOpts *RateLimitRetryOpts = nil
  128. if env.IsPrometheusRetryOnRateLimitResponse() {
  129. rateLimitRetryOpts = &RateLimitRetryOpts{
  130. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  131. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  132. }
  133. }
  134. auth := &ClientAuth{
  135. Username: env.GetMultiClusterBasicAuthUsername(),
  136. Password: env.GetMultiClusterBasicAuthPassword(),
  137. BearerToken: env.GetMultiClusterBearerToken(),
  138. }
  139. clientConfig := &PrometheusClientConfig{
  140. Timeout: timeout,
  141. KeepAlive: keepAlive,
  142. TLSHandshakeTimeout: tlsHandshakeTimeout,
  143. TLSInsecureSkipVerify: env.IsInsecureSkipVerify(),
  144. RateLimitRetryOpts: rateLimitRetryOpts,
  145. Auth: auth,
  146. QueryConcurrency: queryConcurrency,
  147. QueryLogFile: env.GetQueryLoggingFile(),
  148. HeaderXScopeOrgId: "",
  149. RootCAs: nil,
  150. }
  151. thanosQueryOffset := env.GetThanosOffset()
  152. d, err := timeutil.ParseDuration(thanosQueryOffset)
  153. if err != nil {
  154. return nil, fmt.Errorf("failed to parse thanos query offset: %w", err)
  155. }
  156. dataResolution := env.GetETLResolution()
  157. return &OpenCostThanosConfig{
  158. OpenCostPrometheusConfig: &OpenCostPrometheusConfig{
  159. ServerEndpoint: serverEndpoint,
  160. ClientConfig: clientConfig,
  161. ScrapeInterval: scrapeInterval,
  162. JobName: jobName,
  163. Offset: thanosQueryOffset,
  164. QueryOffset: d,
  165. MaxQueryDuration: maxQueryDuration,
  166. ClusterID: "", // thanos is multi-cluster
  167. ClusterFilter: "", // thanos is multi-cluster
  168. ClusterLabel: clusterLabel,
  169. DataResolution: dataResolution,
  170. },
  171. MaxSourceResulution: env.GetThanosMaxSourceResolution(),
  172. }, nil
  173. }