config.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package prom
  2. import (
  3. "crypto/x509"
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/util/timeutil"
  8. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  9. restclient "k8s.io/client-go/rest"
  10. certutil "k8s.io/client-go/util/cert"
  11. )
  12. const (
  13. ServiceCA = `/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`
  14. )
  15. type OpenCostPrometheusConfig struct {
  16. ServerEndpoint string
  17. Version string
  18. IsOffsetResolution bool
  19. ClientConfig *PrometheusClientConfig
  20. ScrapeInterval time.Duration
  21. JobName string
  22. Offset string
  23. QueryOffset time.Duration
  24. MaxQueryDuration time.Duration
  25. ClusterLabel string
  26. ClusterID string
  27. ClusterFilter string
  28. DataResolution time.Duration
  29. DataResolutionMinutes int
  30. }
  31. type OpenCostThanosConfig struct {
  32. *OpenCostPrometheusConfig
  33. MaxSourceResulution string
  34. }
  35. func (ocpc *OpenCostPrometheusConfig) IsRateLimitRetryEnabled() bool {
  36. return ocpc.ClientConfig.RateLimitRetryOpts != nil
  37. }
  38. // NewOpenCostPrometheusConfigFromEnv creates a new OpenCostPrometheusConfig from environment variables.
  39. func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
  40. serverEndpoint := env.GetPrometheusServerEndpoint()
  41. if serverEndpoint == "" {
  42. return nil, fmt.Errorf("no address for prometheus set in $%s", env.PrometheusServerEndpointEnvVar)
  43. }
  44. queryConcurrency := env.GetMaxQueryConcurrency()
  45. log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
  46. timeout := env.GetPrometheusQueryTimeout()
  47. keepAlive := env.GetPrometheusKeepAlive()
  48. tlsHandshakeTimeout := env.GetPrometheusTLSHandshakeTimeout()
  49. jobName := env.GetJobName()
  50. scrapeInterval := env.GetScrapeInterval()
  51. maxQueryDuration := env.GetETLMaxPrometheusQueryDuration()
  52. clusterId := env.GetClusterID()
  53. clusterLabel := env.GetPromClusterLabel()
  54. clusterFilter := env.GetPromClusterFilter()
  55. var rateLimitRetryOpts *RateLimitRetryOpts = nil
  56. if env.IsPrometheusRetryOnRateLimitResponse() {
  57. rateLimitRetryOpts = &RateLimitRetryOpts{
  58. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  59. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  60. }
  61. }
  62. auth := &ClientAuth{
  63. Username: env.GetDBBasicAuthUsername(),
  64. Password: env.GetDBBasicAuthUserPassword(),
  65. BearerToken: env.GetDBBearerToken(),
  66. }
  67. // We will use the service account token and service-ca.crt to authenticate with the Prometheus server via kube-rbac-proxy.
  68. // We need to ensure that the service account has the necessary permissions to access the Prometheus server by binding it to the appropriate role.
  69. var tlsCaCert *x509.CertPool
  70. if env.IsKubeRbacProxyEnabled() {
  71. restConfig, err := restclient.InClusterConfig()
  72. if err != nil {
  73. log.Errorf("%s was set to true but failed to get in-cluster config: %s", env.KubeRbacProxyEnabledEnvVar, err)
  74. }
  75. auth.BearerToken = restConfig.BearerToken
  76. tlsCaCert, err = certutil.NewPool(ServiceCA)
  77. if err != nil {
  78. log.Errorf("%s was set to true but failed to load service-ca.crt: %s", env.KubeRbacProxyEnabledEnvVar, err)
  79. }
  80. }
  81. dataResolution := env.GetETLResolution()
  82. // Ensuring if data resolution is less than 60s default it to 1m
  83. resolutionMinutes := int(dataResolution.Minutes())
  84. if resolutionMinutes == 0 {
  85. resolutionMinutes = 1
  86. }
  87. clientConfig := &PrometheusClientConfig{
  88. Timeout: timeout,
  89. KeepAlive: keepAlive,
  90. TLSHandshakeTimeout: tlsHandshakeTimeout,
  91. TLSInsecureSkipVerify: env.IsInsecureSkipVerify(),
  92. RootCAs: tlsCaCert,
  93. RateLimitRetryOpts: rateLimitRetryOpts,
  94. Auth: auth,
  95. QueryConcurrency: queryConcurrency,
  96. QueryLogFile: "",
  97. HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
  98. }
  99. return &OpenCostPrometheusConfig{
  100. ServerEndpoint: serverEndpoint,
  101. Version: "0.0.0",
  102. IsOffsetResolution: false,
  103. ClientConfig: clientConfig,
  104. ScrapeInterval: scrapeInterval,
  105. JobName: jobName,
  106. Offset: "",
  107. QueryOffset: time.Duration(0),
  108. MaxQueryDuration: maxQueryDuration,
  109. ClusterLabel: clusterLabel,
  110. ClusterID: clusterId,
  111. ClusterFilter: clusterFilter,
  112. DataResolution: dataResolution,
  113. DataResolutionMinutes: resolutionMinutes,
  114. }, nil
  115. }
  116. // NewOpenCostPrometheusConfigFromEnv creates a new OpenCostPrometheusConfig from environment variables.
  117. func NewOpenCostThanosConfigFromEnv() (*OpenCostThanosConfig, error) {
  118. serverEndpoint := env.GetThanosQueryUrl()
  119. if serverEndpoint == "" {
  120. return nil, fmt.Errorf("no address for thanos set in $%s", env.ThanosQueryUrlEnvVar)
  121. }
  122. queryConcurrency := env.GetMaxQueryConcurrency()
  123. log.Infof("Thanos Client Max Concurrency set to %d", queryConcurrency)
  124. timeout := env.GetPrometheusQueryTimeout()
  125. keepAlive := env.GetPrometheusKeepAlive()
  126. tlsHandshakeTimeout := env.GetPrometheusTLSHandshakeTimeout()
  127. jobName := env.GetJobName()
  128. scrapeInterval := env.GetScrapeInterval()
  129. maxQueryDuration := env.GetETLMaxPrometheusQueryDuration()
  130. clusterLabel := env.GetPromClusterLabel()
  131. var rateLimitRetryOpts *RateLimitRetryOpts = nil
  132. if env.IsPrometheusRetryOnRateLimitResponse() {
  133. rateLimitRetryOpts = &RateLimitRetryOpts{
  134. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  135. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  136. }
  137. }
  138. auth := &ClientAuth{
  139. Username: env.GetMultiClusterBasicAuthUsername(),
  140. Password: env.GetMultiClusterBasicAuthPassword(),
  141. BearerToken: env.GetMultiClusterBearerToken(),
  142. }
  143. clientConfig := &PrometheusClientConfig{
  144. Timeout: timeout,
  145. KeepAlive: keepAlive,
  146. TLSHandshakeTimeout: tlsHandshakeTimeout,
  147. TLSInsecureSkipVerify: env.IsInsecureSkipVerify(),
  148. RateLimitRetryOpts: rateLimitRetryOpts,
  149. Auth: auth,
  150. QueryConcurrency: queryConcurrency,
  151. QueryLogFile: env.GetQueryLoggingFile(),
  152. HeaderXScopeOrgId: "",
  153. RootCAs: nil,
  154. }
  155. thanosQueryOffset := env.GetThanosOffset()
  156. d, err := timeutil.ParseDuration(thanosQueryOffset)
  157. if err != nil {
  158. return nil, fmt.Errorf("failed to parse thanos query offset: %w", err)
  159. }
  160. dataResolution := env.GetETLResolution()
  161. return &OpenCostThanosConfig{
  162. OpenCostPrometheusConfig: &OpenCostPrometheusConfig{
  163. ServerEndpoint: serverEndpoint,
  164. Version: "0.0.0",
  165. IsOffsetResolution: false,
  166. ClientConfig: clientConfig,
  167. ScrapeInterval: scrapeInterval,
  168. JobName: jobName,
  169. Offset: thanosQueryOffset,
  170. QueryOffset: d,
  171. MaxQueryDuration: maxQueryDuration,
  172. ClusterID: "", // thanos is multi-cluster
  173. ClusterFilter: "", // thanos is multi-cluster
  174. ClusterLabel: clusterLabel,
  175. DataResolution: dataResolution,
  176. },
  177. MaxSourceResulution: env.GetThanosMaxSourceResolution(),
  178. }, nil
  179. }